ソースを参照

starpupy:replace struct codelet_args by using starpu_codelet_pack/unpack

HE Kun 4 年 前
コミット
f777063bed
共有3 個のファイルを変更した145 個の追加40 個の削除を含む
  1. 21 0
      include/starpu_task_util.h
  2. 32 0
      src/util/starpu_task_insert_utils.c
  3. 92 40
      starpupy/src/starpu_task_wrapper.c

+ 21 - 0
include/starpu_task_util.h

@@ -510,6 +510,27 @@ void starpu_codelet_pack_arg_fini(struct starpu_codelet_pack_arg_data *state, vo
 void starpu_codelet_unpack_args(void *cl_arg, ...);
 
 /**
+   Initialize struct starpu_codelet_pack_arg_data before calling
+   starpu_codelet_unpack_arg(). This will pass the starpu_task->cl_arg 
+   and starpu_task->cl_arg_size to the content of struct starpu_codelet_pack_arg_data.
+*/
+void starpu_codelet_unpack_arg_init(struct starpu_codelet_pack_arg_data *state, void **cl_arg, size_t *cl_arg_size);
+
+/**
+   Unpack one argument from struct starpu_codelet_pack_arg \p state into ptr.
+   That structure has to be initialized before with starpu_codelet_unpack_arg_init().
+*/
+void starpu_codelet_unpack_arg(struct starpu_codelet_pack_arg_data *state, void **ptr);
+
+void starpu_codelet_unpack_arg_fini(struct starpu_codelet_pack_arg_data *state);
+
+/**
+   Call this function during unpacking to skip saving the argument in ptr.
+*/
+void starpu_codelet_unpack_discard_arg(struct starpu_codelet_pack_arg_data *state);
+
+
+/**
    Similar to starpu_codelet_unpack_args(), but if any parameter is 0,
    copy the part of \p cl_arg that has not been read in \p buffer
    which can then be used in a later call to one of the unpack

+ 32 - 0
src/util/starpu_task_insert_utils.c

@@ -63,6 +63,38 @@ void starpu_codelet_pack_arg_fini(struct starpu_codelet_pack_arg_data *state, vo
 	*cl_arg_size = state->arg_buffer_size;
 }
 
+void starpu_codelet_unpack_arg_init(struct starpu_codelet_pack_arg_data *state, void **cl_arg, size_t *cl_arg_size)
+{
+	state->arg_buffer = *cl_arg;
+	state->arg_buffer_size = *cl_arg_size;
+	state->current_offset = sizeof(int);
+	state->nargs = 0;
+}
+
+void starpu_codelet_unpack_arg(struct starpu_codelet_pack_arg_data *state, void **ptr)
+{
+	size_t ptr_size;
+	memcpy((void *)&ptr_size, state->arg_buffer+state->current_offset, sizeof(ptr_size));
+	state->current_offset += sizeof(ptr_size);
+	*ptr =(void **)malloc(ptr_size);
+	memcpy(ptr, state->arg_buffer+state->current_offset, ptr_size);
+	state->current_offset += ptr_size;
+	state->nargs++;
+}
+
+void starpu_codelet_unpack_arg_fini(struct starpu_codelet_pack_arg_data *state)
+{
+
+}
+
+void starpu_codelet_unpack_discard_arg(struct starpu_codelet_pack_arg_data *state)
+{
+	size_t ptr_size;
+	memcpy((void *)&ptr_size, state->arg_buffer+state->current_offset, sizeof(ptr_size));
+	state->current_offset += sizeof(ptr_size);
+	state->current_offset += ptr_size;
+}
+
 int _starpu_codelet_pack_args(void **arg_buffer, size_t *arg_buffer_size, va_list varg_list)
 {
 	int arg_type;

+ 92 - 40
starpupy/src/starpu_task_wrapper.c

@@ -42,28 +42,42 @@ extern void _Py_CountReferences(FILE*);
 
 /*********************Functions passed in task_submit wrapper***********************/
 
-static PyObject *asyncio_module; /*python asyncio library*/
+static PyObject *asyncio_module; /*python asyncio module*/
+static PyObject *cloudpickle_module; /*cloudpickle module*/
 
 /*structure contains parameters which are passed to starpu_task.cl_arg*/
-struct codelet_args
+// struct codelet_args
+// {
+// 	PyObject *f; /*the python function passed in*/
+// 	PyObject *argList; /*argument list of python function passed in*/
+// 	PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
+// 	PyObject *fut; /*asyncio.Future*/
+// 	PyObject *lp; /*asyncio.Eventloop*/
+// };
+
+/*function passed to starpu_codelet.cpu_func*/
+void starpupy_codelet_func(void *buffers[], void *cl_arg)
 {
-	PyObject *f; /*the python function passed in*/
+	PyObject *func_py; /*the python function passed in*/
 	PyObject *argList; /*argument list of python function passed in*/
-	PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
 	PyObject *fut; /*asyncio.Future*/
-	PyObject *lp; /*asyncio.Eventloop*/
-};
+	PyObject *loop; /*asyncio.Eventloop*/
+	//struct codelet_args *cst = (struct codelet_args*) cl_arg;
 
-/*function passed to starpu_codelet.cpu_func*/
-void codelet_func(void *buffers[], void *cl_arg)
-{
-	struct codelet_args *cst = (struct codelet_args*) cl_arg;
+	struct starpu_task *task = starpu_task_get_current();
+	struct starpu_codelet_pack_arg_data data;
+	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
+
+	starpu_codelet_unpack_arg(&data, &func_py);
 
+	starpu_codelet_unpack_arg(&data, &argList);
+	starpu_codelet_unpack_arg(&data, &fut);
+	starpu_codelet_unpack_arg(&data, &loop);
 	/*make sure we own the GIL*/
 	PyGILState_STATE state = PyGILState_Ensure();
 
 	/*verify that the function is a proper callable*/
-	if (!PyCallable_Check(cst->f))
+	if (!PyCallable_Check(func_py))
 	{
 		printf("py_callback: expected a callable function\n");
 		exit(1);
@@ -71,16 +85,16 @@ void codelet_func(void *buffers[], void *cl_arg)
 
 	/*check the arguments of python function passed in*/
 	int i;
-	for(i=0; i < PyTuple_Size(cst->argList); i++)
+	for(i=0; i < PyTuple_Size(argList); i++)
 	{
-		PyObject *obj = PyTuple_GetItem(cst->argList, i);
+		PyObject *obj = PyTuple_GetItem(argList, i);
 		const char *tp = Py_TYPE(obj)->tp_name;
 		if(strcmp(tp, "_asyncio.Future") == 0)
 		{
 			/*if one of arguments is Future, get its result*/
 			PyObject *fut_result = PyObject_CallMethod(obj, "result", NULL);
 			/*replace the Future argument to its result*/
-			PyTuple_SetItem(cst->argList, i, fut_result);
+			PyTuple_SetItem(argList, i, fut_result);
 		}
 		/*else if (strcmp(tp, "numpy.ndarray")==0)
 		  {
@@ -88,11 +102,13 @@ void codelet_func(void *buffers[], void *cl_arg)
 		  }*/
 	}
 
-	/*call the python function*/
-	PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
-	//const char *tp = Py_TYPE(pRetVal)->tp_name;
-	//printf("return value type is %s\n", tp);
-	cst->rv = pRetVal;
+	/*call the python function get the return value rv*/
+	//PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
+	//cst->rv = pRetVal;
+	PyObject *rv = PyObject_CallObject(func_py, argList);
+	//if rv==NULL
+	starpu_codelet_pack_arg(&data, &rv, sizeof(rv));
+    starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
 
 	//Py_DECREF(cst->f);
 
@@ -103,22 +119,33 @@ void codelet_func(void *buffers[], void *cl_arg)
 /*function passed to starpu_task.callback_func*/
 void cb_func(void *v)
 {
+	PyObject *fut; /*asyncio.Future*/
+	PyObject *loop; /*asyncio.Eventloop*/
+	PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
+
 	struct starpu_task *task = starpu_task_get_current();
-	struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
+	//struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
+	struct starpu_codelet_pack_arg_data data;
+	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
+	starpu_codelet_unpack_discard_arg(&data);
+	starpu_codelet_unpack_discard_arg(&data);
+	starpu_codelet_unpack_arg(&data, &fut);
+	starpu_codelet_unpack_arg(&data, &loop);
+	starpu_codelet_unpack_arg(&data, &rv);
 
 	/*make sure we own the GIL*/
 	PyGILState_STATE state = PyGILState_Ensure();
 
 	/*set the Future result and mark the Future as done*/
-	PyObject *set_result = PyObject_GetAttrString(cst->fut, "set_result");
-	PyObject *loop_callback = PyObject_CallMethod(cst->lp, "call_soon_threadsafe", "(O,O)", set_result, cst->rv);
+	PyObject *set_result = PyObject_GetAttrString(fut, "set_result");
+	PyObject *loop_callback = PyObject_CallMethod(loop, "call_soon_threadsafe", "(O,O)", set_result, rv);
 
 	Py_DECREF(loop_callback);
 	Py_DECREF(set_result);
-	Py_DECREF(cst->rv);
-	Py_DECREF(cst->fut);
-	Py_DECREF(cst->lp);
-	Py_DECREF(cst->argList);
+	Py_DECREF(rv);
+	Py_DECREF(fut);
+	Py_DECREF(loop);
+	//Py_DECREF(argList);
 
 	//Py_DECREF(perfmodel);
 	struct starpu_codelet *func_cl=(struct starpu_codelet *) task->cl;
@@ -161,10 +188,19 @@ static PyObject *PyTask_FromTask(struct starpu_task *task)
 static size_t sizebase (struct starpu_task *task, unsigned nimpl)
 {
 	int n=0;
-	struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
+	//struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
+	PyObject *func_py; /*the python function passed in*/
+	PyObject *argList; /*argument list of python function passed in*/
+	PyObject *fut; /*asyncio.Future*/
+	PyObject *loop; /*asyncio.Eventloop*/
+	PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
+
+	struct starpu_task *task_submit = starpu_task_get_current();
+	//struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
+	starpu_codelet_unpack_args(task_submit->cl_arg, &func_py, &argList, &fut, &loop, &rv);
 
 	/*get the result of function*/
-	PyObject *obj=cst->rv;
+	PyObject *obj=rv;
 	/*get the length of result*/
 	const char *tp = Py_TYPE(obj)->tp_name;
 #ifdef STARPU_PYTHON_HAVE_NUMPY
@@ -261,6 +297,9 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 
 	/*first argument in args is always the python function passed in*/
 	PyObject *func_py = PyTuple_GetItem(args, 0);
+
+	Py_INCREF(fut);
+	Py_INCREF(loop);
 	Py_INCREF(func_py);
 
 	/*allocate a task structure and initialize it with default values*/
@@ -292,7 +331,8 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 	struct starpu_codelet *func_cl=(struct starpu_codelet*)malloc(sizeof(struct starpu_codelet));
 	/*initialize func_cl with default values*/
 	starpu_codelet_init(func_cl);
-	func_cl->cpu_funcs[0]=&codelet_func;
+	func_cl->cpu_funcs[0]=&starpupy_codelet_func;
+	func_cl->cpu_funcs_name[0]="starpupy_codelet_func";
 
 	/*check whether the option perfmodel is None*/
 	PyObject *dict_option = PyTuple_GetItem(args, PyTuple_Size(args)-1);/*the last argument is the option dictionary*/
@@ -307,31 +347,41 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 	}
 
 	/*allocate a new codelet structure to pass the python function, asyncio.Future and Event loop*/
-	struct codelet_args *cst = (struct codelet_args*)malloc(sizeof(struct codelet_args));
-	cst->f = func_py;
-	cst->fut = fut;
-	cst->lp = loop;
+	//struct codelet_args *cst = (struct codelet_args*)malloc(sizeof(struct codelet_args));
+	//cst->f = func_py;
+	//cst->fut = fut;
+	//cst->lp = loop;
 
-	Py_INCREF(fut);
-	Py_INCREF(loop);
+	/*Initialize struct starpu_codelet_pack_arg_data*/
+	struct starpu_codelet_pack_arg_data data;
+	starpu_codelet_pack_arg_init(&data);
+
+	/*argument list of python function passed in*/
+	PyObject *argList;
 
 	/*pass args in argList*/
 	if (PyTuple_Size(args)==2)/*function no arguments*/
-		cst->argList = PyTuple_New(0);
+		argList = PyTuple_New(0);
 	else
 	{/*function has arguments*/
-		cst->argList = PyTuple_New(PyTuple_Size(args)-2);
+		argList = PyTuple_New(PyTuple_Size(args)-2);
 		int i;
 		for(i=0; i < PyTuple_Size(args)-2; i++)
 		{
 			PyObject *tmp=PyTuple_GetItem(args, i+1);
-			PyTuple_SetItem(cst->argList, i, tmp);
-			Py_INCREF(PyTuple_GetItem(cst->argList, i));
+			PyTuple_SetItem(argList, i, tmp);
+			Py_INCREF(PyTuple_GetItem(argList, i));
 		}
 	}
+	
+	starpu_codelet_pack_arg(&data, &func_py, sizeof(func_py));
+	starpu_codelet_pack_arg(&data, &argList, sizeof(argList));
+	starpu_codelet_pack_arg(&data, &fut, sizeof(fut));
+	starpu_codelet_pack_arg(&data, &loop, sizeof(loop));
 
 	task->cl=func_cl;
-	task->cl_arg=cst;
+	//task->cl_arg=cst;
+	starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
 
 	/*pass optional values name=None, synchronous=1, priority=0, color=None, flops=None, perfmodel=None*/
 	/*const char * name*/
@@ -525,6 +575,8 @@ PyInit_starpupy(void)
 	assert(ret==0);
 	/*python asysncio import*/
 	asyncio_module = PyImport_ImportModule("asyncio");
+	/*cloudpickle import*/
+	cloudpickle_module = PyImport_ImportModule("cloudpickle");
 #ifdef STARPU_PYTHON_HAVE_NUMPY
 	/*numpy import array*/
 	import_array();