Просмотр исходного кода

starpupy: "starpu.task_submit" returns asyncio.Future

HE Kun лет назад: 4
Родитель
Сommit
4431d5d0b3
1 измененных файлов с 87 добавлено и 73 удалено
  1. 87 73
      starpupy/wrapper/starpu/starpu_task_wrapper.c

+ 87 - 73
starpupy/wrapper/starpu/starpu_task_wrapper.c

@@ -6,110 +6,124 @@
 #define PY_SSIZE_T_CLEAN
 #include <Python.h>
 
-/********************preparation of calling python function*************************/
-//static PyObject* func_py;
-/*call python function no input no output*/
-static void py_callback(PyObject * func_py){
-	PyObject *pRetVal;
+/*********************Functions passed in task_submit wrapper***********************/
 
-    // make sure we own the GIL
+static PyObject* asyncio_module; /*python asyncio library*/
+
+/*structure contains parameters which are passed to starpu_task.cl_arg*/
+struct codelet_struct { 
+    PyObject* f; /*the python function passed in*/
+    PyObject* rv; /*return value when using PyObject_CallObject call the function f*/
+    PyObject* fut; /*asyncio.Future*/
+    PyObject* lp; /*asyncio.Eventloop*/
+};
+typedef struct codelet_struct codelet_st;
+
+/*function passed to starpu_codelet.cpu_func*/
+void codelet_func(void *buffers[], void *cl_arg){
+
+    codelet_st* cst = (codelet_st*) cl_arg;
+
+    /*make sure we own the GIL*/
     PyGILState_STATE state = PyGILState_Ensure();
-    //printf("func_py in py_callback is %p\n", func_py);
-    // verify that func is a proper callable
-    if (!PyCallable_Check(func_py)) {
+
+    /*verify that the function is a proper callable*/
+    if (!PyCallable_Check(cst->f)) {
 
         printf("py_callback: expected a callablen\n"); 
-        exit(-1);
+        exit(1);
     }
 
-    // call the function
-    pRetVal = PyObject_CallObject(func_py, NULL);
-    Py_DECREF(func_py);
+    /*call the function*/
+    PyObject *pRetVal = PyObject_CallObject(cst->f, NULL);
+    cst->rv=pRetVal;
 
-    // check for Python exceptions
-    if (PyErr_Occurred()) {
-
-        PyErr_Print(); 
-        exit(-1);
-    }
-	Py_DECREF(pRetVal);
+    Py_DECREF(cst->f);
 
-	//printf("finish callback\n");
-    // restore previous GIL state and return 
+    /*restore previous GIL state*/
     PyGILState_Release(state);
 }
-/***********************************************************************************/
 
-/*****************************Methods and their wrappers****************************/
+/*function passed to starpu_task.callback_func*/
+void cb_func(void *v){
 
-/*structure contains type of parameters*/
-struct codelet_struct { 
-    PyObject* f; //function no input no output
-};
-typedef struct codelet_struct codelet_st;
+	struct starpu_task *task=starpu_task_get_current();
+    codelet_st* cst = (codelet_st*) task->cl_arg;
 
-/*cpu_func*/
-void codelet_func(void *buffers[], void *cl_arg){
-    //printf("begin to print in codelet_func\n");
-    codelet_st* cst = (codelet_st*) cl_arg;
-    py_callback(cst->f);
-    //printf("finish to print in codelet_func\n");
-}
+    /*make sure we own the GIL*/
+    PyGILState_STATE state = PyGILState_Ensure();
 
-/*call back function to deallocate task*/
-void cb_func(void*f){
-	struct starpu_task *task=starpu_task_get_current();
-	free(task->cl);
+    /*set the Future result and mark the Future as done*/
+    PyObject * set_result = PyObject_GetAttrString(cst->fut, "set_result");
+    PyObject * loop_callback = PyObject_CallMethod(cst->lp, "call_soon_threadsafe", "(O,O)", set_result, cst->rv);
+
+    Py_DECREF(loop_callback);
+    Py_DECREF(set_result);
+    Py_DECREF(cst->rv);
+    Py_DECREF(cst->fut);
+    Py_DECREF(cst->lp);
+
+    /*restore previous GIL state*/
+    PyGILState_Release(state);
+
+    /*deallocate task*/
+    free(task->cl);
 	free(task->cl_arg);
 }
 
+/***********************************************************************************/
+
+/*****************************Wrappers of StarPU methods****************************/
 /*wrapper submit method*/
 static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
+
+    /*the python function passed in*/
 	PyObject* func_py;
+    /*get the running Event loop*/
+    PyObject* loop = PyObject_CallMethod(asyncio_module, "get_running_loop", NULL);
+    /*create a asyncio.Future object*/
+    PyObject* fut = PyObject_CallMethod(loop, "create_future", NULL);
 
 	if (!PyArg_ParseTuple(args, "O", &func_py))
 		return NULL;
-	//printf("func_py in wrapper is %p\n", func_py);
 
-	//call submit method
-	//allocate a task structure and initialize it with default values 
+	/*allocate a task structure and initialize it with default values*/
     struct starpu_task *task=starpu_task_create();
-
-    //allocate a codelet structure
+    /*allocate a codelet structure*/
     struct starpu_codelet *func_cl=(struct starpu_codelet*)malloc(sizeof(struct starpu_codelet));
-    //initialize func_cl with default values
+    /*initialize func_cl with default values*/
     starpu_codelet_init(func_cl);
     func_cl->cpu_func=&codelet_func;
 
-    //allocate a new structure to pass the function python
-    codelet_st *cst;
-    cst = (codelet_st*)malloc(sizeof(codelet_st));
+    /*allocate a new codelet structure to pass the python function, asyncio.Future and Event loop*/
+    codelet_st *cst = (codelet_st*)malloc(sizeof(codelet_st));
     cst->f = func_py;
+    cst->fut = fut;
+    cst->lp = loop;
     Py_INCREF(func_py);
+    Py_INCREF(fut);
+    Py_INCREF(loop);
 
     task->cl=func_cl;
     task->cl_arg=cst;
 
+    /*call starpu_task_submit method*/
     int retval=starpu_task_submit(task);
-    printf("finish to submit task, result is %d\n", retval);
 
     task->callback_func=&cb_func;
 
-    //return type is void
-    Py_INCREF(Py_None);
-    return Py_None;
-	
+    return fut;	
 }
 
 /*wrapper wait for all method*/
 static PyObject* starpu_task_wait_for_all_wrapper(PyObject *self, PyObject *args){
 
-	//call wait for all method
+	/*call starpu_task_wait_for_all method*/
 	Py_BEGIN_ALLOW_THREADS
 	starpu_task_wait_for_all();
 	Py_END_ALLOW_THREADS
 
-	//return type is void
+	/*return type is void*/
 	Py_INCREF(Py_None);
     return Py_None;
 }
@@ -117,10 +131,10 @@ static PyObject* starpu_task_wait_for_all_wrapper(PyObject *self, PyObject *args
 /*wrapper pause method*/
 static PyObject* starpu_pause_wrapper(PyObject *self, PyObject *args){
 
-	//call pause method
+	/*call starpu_pause method*/
 	starpu_pause();
 
-	//return type is void
+	/*return type is void*/
 	Py_INCREF(Py_None);
     return Py_None;
 }
@@ -128,10 +142,10 @@ static PyObject* starpu_pause_wrapper(PyObject *self, PyObject *args){
 /*wrapper resume method*/
 static PyObject* starpu_resume_wrapper(PyObject *self, PyObject *args){
 
-	//call resume method
+	/*call starpu_resume method*/
 	starpu_resume();
 
-	//return type is void
+	/*return type is void*/
 	Py_INCREF(Py_None);
     return Py_None;
 }
@@ -142,29 +156,30 @@ static PyObject* starpu_resume_wrapper(PyObject *self, PyObject *args){
 /*method table*/
 static PyMethodDef taskMethods[] = 
 { 
-  {"task_submit", starpu_task_submit_wrapper, METH_VARARGS, "submit the task"}, //submit method
-  {"task_wait_for_all", starpu_task_wait_for_all_wrapper, METH_VARARGS, "wait the task"}, //wait for all method
-  {"pause", starpu_pause_wrapper, METH_VARARGS, "suspend the processing of new tasks by workers"}, //pause method
-  {"resume", starpu_resume_wrapper, METH_VARARGS, "resume the workers polling for new tasks"}, //resume method
+  {"task_submit", starpu_task_submit_wrapper, METH_VARARGS, "submit the task"}, /*submit method*/
+  {"task_wait_for_all", starpu_task_wait_for_all_wrapper, METH_VARARGS, "wait the task"}, /*wait for all method*/
+  {"pause", starpu_pause_wrapper, METH_VARARGS, "suspend the processing of new tasks by workers"}, /*pause method*/
+  {"resume", starpu_resume_wrapper, METH_VARARGS, "resume the workers polling for new tasks"}, /*resume method*/
   {NULL, NULL}
 };
 
 /*deallocation function*/
-static void taskFree(void* f){
+static void taskFree(void *v){
 	starpu_shutdown();
+    Py_DECREF(asyncio_module);
 }
 
-/*the method table must be referenced in the module definition structure*/
+/*module definition structure*/
 static struct PyModuleDef taskmodule={
   PyModuleDef_HEAD_INIT,
   "task", /*name of module*/
   NULL,
   -1,
-  taskMethods,
+  taskMethods, /*method table*/
   NULL,
   NULL,
   NULL,
-  taskFree
+  taskFree /*deallocation function*/
 };
 
 /*initialization function*/
@@ -172,12 +187,11 @@ PyMODINIT_FUNC
 PyInit_task(void)
 {
     PyEval_InitThreads();
-    //starpu initialization
-    printf("begin initialization\n");
+    /*starpu initialization*/
 	int ret = starpu_init(NULL);
-	printf("finish initialization, result is %d\n",ret);
-
-    //python import initialization
+    /*python asysncio import*/
+    asyncio_module = PyImport_ImportModule("asyncio");
+    /*module import initialization*/
     return PyModule_Create(&taskmodule);
 }
 /***********************************************************************************/