Sfoglia il codice sorgente

starpupy: support the function which takes int/float as input and output

HE Kun 4 anni fa
parent
commit
a47579dc72

+ 8 - 3
starpupy/src/starpu/delay.py

@@ -1,5 +1,10 @@
 from starpu import task
+import asyncio
+
 def delayed(f):
-	def submit():
-		task.task_submit(f)
-	return submit
+	def submit(*args,**kwargs):
+		async def fut_wait():
+			fut = task.task_submit(f, list(args))
+			res = await fut
+		asyncio.run(fut_wait())
+	return submit

+ 109 - 77
starpupy/src/starpu/starpu_task_wrapper.c

@@ -6,110 +6,142 @@
 #define PY_SSIZE_T_CLEAN
 #include <Python.h>
 
-/********************preparation of calling python function*************************/
-//static PyObject* func_py;
-/*call python function no input no output*/
-static void py_callback(PyObject * func_py){
-	PyObject *pRetVal;
+/*********************Functions passed in task_submit wrapper***********************/
 
-    // make sure we own the GIL
-    PyGILState_STATE state = PyGILState_Ensure();
-    //printf("func_py in py_callback is %p\n", func_py);
-    // verify that func is a proper callable
-    if (!PyCallable_Check(func_py)) {
+static PyObject* asyncio_module; /*python asyncio library*/
 
-        printf("py_callback: expected a callablen\n"); 
-        exit(-1);
-    }
+/*structure contains parameters which are passed to starpu_task.cl_arg*/
+struct codelet_struct { 
+    PyObject* f; /*the python function passed in*/
+    PyObject* argList; /*argument list of python function passed in*/
+    PyObject* rv; /*return value when using PyObject_CallObject call the function f*/
+    PyObject* fut; /*asyncio.Future*/
+    PyObject* lp; /*asyncio.Eventloop*/
+};
+typedef struct codelet_struct codelet_st;
+
+/*function passed to starpu_codelet.cpu_func*/
+void codelet_func(void *buffers[], void *cl_arg){
+
+    codelet_st* cst = (codelet_st*) cl_arg;
 
-    // call the function
-    pRetVal = PyObject_CallObject(func_py, NULL);
-    Py_DECREF(func_py);
+    /*make sure we own the GIL*/
+    PyGILState_STATE state = PyGILState_Ensure();
 
-    // check for Python exceptions
-    if (PyErr_Occurred()) {
+    /*verify that the function is a proper callable*/
+    if (!PyCallable_Check(cst->f)) {
 
-        PyErr_Print(); 
-        exit(-1);
+        printf("py_callback: expected a callablen\n"); 
+        exit(1);
     }
-	Py_DECREF(pRetVal);
+    
+    /*call the python function*/
+    PyObject *pRetVal = PyObject_CallObject(cst->f, PyList_AsTuple(cst->argList));
+    cst->rv=pRetVal;
+
+    Py_DECREF(cst->f);
+    for(int i = 0; i < PyList_Size(cst->argList); i++){
+        Py_DECREF(PyList_GetItem(cst->argList, i));
+    }
+    Py_DECREF(cst->argList);
 
-	//printf("finish callback\n");
-    // restore previous GIL state and return 
+    /*restore previous GIL state*/
     PyGILState_Release(state);
 }
-/***********************************************************************************/
 
-/*****************************Methods and their wrappers****************************/
+/*function passed to starpu_task.callback_func*/
+void cb_func(void *v){
 
-/*structure contains type of parameters*/
-struct codelet_struct { 
-    PyObject* f; //function no input no output
-};
-typedef struct codelet_struct codelet_st;
+	struct starpu_task *task=starpu_task_get_current();
+    codelet_st* cst = (codelet_st*) task->cl_arg;
 
-/*cpu_func*/
-void codelet_func(void *buffers[], void *cl_arg){
-    //printf("begin to print in codelet_func\n");
-    codelet_st* cst = (codelet_st*) cl_arg;
-    py_callback(cst->f);
-    //printf("finish to print in codelet_func\n");
-}
+    /*make sure we own the GIL*/
+    PyGILState_STATE state = PyGILState_Ensure();
 
-/*call back function to deallocate task*/
-void cb_func(void*f){
-	struct starpu_task *task=starpu_task_get_current();
-	free(task->cl);
-	free(task->cl_arg);
+    /*set the Future result and mark the Future as done*/
+    PyObject * set_result = PyObject_GetAttrString(cst->fut, "set_result");
+    PyObject * loop_callback = PyObject_CallMethod(cst->lp, "call_soon_threadsafe", "(O,O)", set_result, cst->rv);
+
+    Py_DECREF(loop_callback);
+    Py_DECREF(set_result);
+    Py_DECREF(cst->rv);
+    Py_DECREF(cst->fut);
+    Py_DECREF(cst->lp);
+
+    /*restore previous GIL state*/
+    PyGILState_Release(state);
+
+    /*deallocate task*/
+    free(task->cl);
+	  free(task->cl_arg);
 }
 
+/***********************************************************************************/
+
+/*****************************Wrappers of StarPU methods****************************/
 /*wrapper submit method*/
 static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
-	PyObject* func_py;
 
-	if (!PyArg_ParseTuple(args, "O", &func_py))
+    /*get the running Event loop*/
+    PyObject* loop = PyObject_CallMethod(asyncio_module, "get_running_loop", NULL);
+    /*create a asyncio.Future object*/
+    PyObject* fut = PyObject_CallMethod(loop, "create_future", NULL);
+    /*the python function passed in*/
+    PyObject* func_py;
+    /*the args of function*/
+    PyObject* pArgs;
+
+	if (!PyArg_ParseTuple(args, "OO", &func_py, &pArgs))
 		return NULL;
-	//printf("func_py in wrapper is %p\n", func_py);
 
-	//call submit method
-	//allocate a task structure and initialize it with default values 
+	/*allocate a task structure and initialize it with default values*/
     struct starpu_task *task=starpu_task_create();
-
-    //allocate a codelet structure
+    /*allocate a codelet structure*/
     struct starpu_codelet *func_cl=(struct starpu_codelet*)malloc(sizeof(struct starpu_codelet));
-    //initialize func_cl with default values
+    /*initialize func_cl with default values*/
     starpu_codelet_init(func_cl);
     func_cl->cpu_func=&codelet_func;
 
-    //allocate a new structure to pass the function python
-    codelet_st *cst;
-    cst = (codelet_st*)malloc(sizeof(codelet_st));
+    /*allocate a new codelet structure to pass the python function, asyncio.Future and Event loop*/
+    codelet_st *cst = (codelet_st*)malloc(sizeof(codelet_st));
     cst->f = func_py;
+    cst->fut = fut;
+    cst->lp = loop;
     Py_INCREF(func_py);
+    Py_INCREF(fut);
+    Py_INCREF(loop);
+
+    /*allocate a new list of length*/
+    cst->argList = PyList_New (PyList_Size(pArgs));
+
+    /*pass pArgs in argList*/
+    for(int i = 0; i < PyList_Size(pArgs); i++){
+        PyList_SetItem(cst->argList, i, pArgs);
+        Py_INCREF(PyList_GetItem(cst->argList, i));
+    }
+    cst->argList = pArgs;
+    Py_INCREF(pArgs);
 
     task->cl=func_cl;
     task->cl_arg=cst;
 
+    /*call starpu_task_submit method*/
     int retval=starpu_task_submit(task);
-    printf("finish to submit task, result is %d\n", retval);
 
     task->callback_func=&cb_func;
 
-    //return type is void
-    Py_INCREF(Py_None);
-    return Py_None;
-	
+    return fut;	
 }
 
 /*wrapper wait for all method*/
 static PyObject* starpu_task_wait_for_all_wrapper(PyObject *self, PyObject *args){
 
-	//call wait for all method
+	/*call starpu_task_wait_for_all method*/
 	Py_BEGIN_ALLOW_THREADS
 	starpu_task_wait_for_all();
 	Py_END_ALLOW_THREADS
 
-	//return type is void
+	/*return type is void*/
 	Py_INCREF(Py_None);
     return Py_None;
 }
@@ -117,10 +149,10 @@ static PyObject* starpu_task_wait_for_all_wrapper(PyObject *self, PyObject *args
 /*wrapper pause method*/
 static PyObject* starpu_pause_wrapper(PyObject *self, PyObject *args){
 
-	//call pause method
+	/*call starpu_pause method*/
 	starpu_pause();
 
-	//return type is void
+	/*return type is void*/
 	Py_INCREF(Py_None);
     return Py_None;
 }
@@ -128,10 +160,10 @@ static PyObject* starpu_pause_wrapper(PyObject *self, PyObject *args){
 /*wrapper resume method*/
 static PyObject* starpu_resume_wrapper(PyObject *self, PyObject *args){
 
-	//call resume method
+	/*call starpu_resume method*/
 	starpu_resume();
 
-	//return type is void
+	/*return type is void*/
 	Py_INCREF(Py_None);
     return Py_None;
 }
@@ -142,29 +174,30 @@ static PyObject* starpu_resume_wrapper(PyObject *self, PyObject *args){
 /*method table*/
 static PyMethodDef taskMethods[] = 
 { 
-  {"task_submit", starpu_task_submit_wrapper, METH_VARARGS, "submit the task"}, //submit method
-  {"task_wait_for_all", starpu_task_wait_for_all_wrapper, METH_VARARGS, "wait the task"}, //wait for all method
-  {"pause", starpu_pause_wrapper, METH_VARARGS, "suspend the processing of new tasks by workers"}, //pause method
-  {"resume", starpu_resume_wrapper, METH_VARARGS, "resume the workers polling for new tasks"}, //resume method
+  {"task_submit", starpu_task_submit_wrapper, METH_VARARGS, "submit the task"}, /*submit method*/
+  {"task_wait_for_all", starpu_task_wait_for_all_wrapper, METH_VARARGS, "wait the task"}, /*wait for all method*/
+  {"pause", starpu_pause_wrapper, METH_VARARGS, "suspend the processing of new tasks by workers"}, /*pause method*/
+  {"resume", starpu_resume_wrapper, METH_VARARGS, "resume the workers polling for new tasks"}, /*resume method*/
   {NULL, NULL}
 };
 
 /*deallocation function*/
-static void taskFree(void* f){
+static void taskFree(void *v){
 	starpu_shutdown();
+    Py_DECREF(asyncio_module);
 }
 
-/*the method table must be referenced in the module definition structure*/
+/*module definition structure*/
 static struct PyModuleDef taskmodule={
   PyModuleDef_HEAD_INIT,
   "task", /*name of module*/
   NULL,
   -1,
-  taskMethods,
+  taskMethods, /*method table*/
   NULL,
   NULL,
   NULL,
-  taskFree
+  taskFree /*deallocation function*/
 };
 
 /*initialization function*/
@@ -172,12 +205,11 @@ PyMODINIT_FUNC
 PyInit_task(void)
 {
     PyEval_InitThreads();
-    //starpu initialization
-    printf("begin initialization\n");
+    /*starpu initialization*/
 	int ret = starpu_init(NULL);
-	printf("finish initialization, result is %d\n",ret);
-
-    //python import initialization
+    /*python asysncio import*/
+    asyncio_module = PyImport_ImportModule("asyncio");
+    /*module import initialization*/
     return PyModule_Create(&taskmodule);
 }
 /***********************************************************************************/

+ 93 - 17
starpupy/tests/starpu_py.py

@@ -1,28 +1,104 @@
 import starpu
 import time
+import asyncio
 
+############################################################################
+#function no input no output print hello world
+def hello():
+	print ("Example 1:")
+	print ("Hello, world!")
+
+#submit function "hello"
+async def hello_wait():
+    fut = starpu.task_submit(hello,[])
+    await fut
+asyncio.run(hello_wait())
+
+#############################################################################
+
+#function no input no output
+def func1():
+	print ("Example 2:")
+	print ("This is a function no input no output")
+
+#submit function "func1"
+async def func1_wait():
+    fut1 = starpu.task_submit(func1,[])
+    await fut1
+asyncio.run(func1_wait())
+
+##############################################################################
+
+#using decorator wrap the function no input no output
 @starpu.delayed
-def salut():
-	time.sleep(1)
-	print ("salut, le monde")
+def func1_deco():
+	#time.sleep(1)
+	print ("Example 3:")
+	print ("This is a function wrapped by the decorator function")
 
+#apply starpu.delayed(func1_deco())
+func1_deco()
 
-def hello():
-	print ("print in python")
-	print ("Hello, world!")
+##############################################################################
+
+#function no input return a value
+def func2():
+	print ("Example 4:")
+	return 12
+
+#submit function "func2"
+async def func2_wait():
+    fut2 = starpu.task_submit(func2, [])
+    res2 = await fut2
+    #print the result of function
+    print("This is a function no input and the return value is", res2)
+asyncio.run(func2_wait())
+
+###############################################################################
+ 
+#function has 2 int inputs and 1 int output
+def multi(a,b):
+	print ("Example 5:")
+	return a*b
+
+#submit function "multi"
+async def multi_wait():
+	fut3 = starpu.task_submit(multi, [2, 3])
+	res3=await fut3
+	print("The result of function multi is :", res3)
+asyncio.run(multi_wait())
+#print(multi(2, 3))
+
+###############################################################################
+
+#function has 4 float inputs and 1 float output
+def add(a,b,c,d):
+	print ("Example 6:")
+	return a+b+c+d
+
+#submit function "add"
+async def add_wait():
+	fut4 = starpu.task_submit(add, [1.2, 2.5, 3.6, 4.9])
+	res4=await fut4
+	print("The result of function add is :", res4)
+asyncio.run(add_wait())
+#print(add(1.2, 2.5, 3.6, 4.9))
+
+###############################################################################
 
-starpu.pause()
+#function has 2 int inputs 1 float input and 1 float output 1 int output
+def sub(a,b,c):
+	print ("Example 7:")
+	return a-b-c, a-b
 
-print("begin to submit task in python")
-fut=starpu.task_submit(hello)
-salut()
-#starpu.task_submit(hello)
-#starpu.task_submit(hello)
-print("finish to submit task in python")
-starpu.resume()
+#submit function "sub"
+async def sub_wait():
+	fut5 = starpu.task_submit(sub, [6, 2, 5.9])
+	res5 = await fut5
+	print("The result of function sub is:", res5)
+asyncio.run(sub_wait())
+#print(sub(6, 2, 5.9))
 
-#print("begin to sleep")
-#time.sleep(1)
-#print("finish to sleep")
+###############################################################################
 
 starpu.task_wait_for_all()