Selaa lähdekoodia

starpupy: add the case that the argument of python function passed in can be the Future result of another python function

HE Kun 4 vuotta sitten
vanhempi
commit
50b7911e27
2 muutettua tiedostoa jossa 71 lisäystä ja 12 poistoa
  1. 54 11
      starpupy/src/starpu/starpu_task_wrapper.c
  2. 17 1
      starpupy/tests/starpu_py.py

+ 54 - 11
starpupy/src/starpu/starpu_task_wrapper.c

@@ -15,6 +15,7 @@
  */
 #include <stdio.h>
 #include <stdlib.h>
+#include <string.h>
 
 #include <starpu.h>
 
@@ -50,6 +51,18 @@ void codelet_func(void *buffers[], void *cl_arg){
         exit(1);
     }
     
+    /*check the arguments of python function passed in*/
+    for (int i=0; i < PyTuple_Size(cst->argList); i++){
+      PyObject* obj=PyTuple_GetItem(cst->argList, i);
+      const char* tp = Py_TYPE(obj)->tp_name;
+      if(strcmp(tp, "_asyncio.Future") == 0){
+        /*if one of arguments is Future, get its result*/
+        PyObject * fut_result = PyObject_CallMethod(obj, "result", NULL);
+        /*replace the Future argument to its result*/
+        PyTuple_SetItem(cst->argList, i, fut_result);
+      }
+    }
+
     /*call the python function*/
     PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
     cst->rv=pRetVal;
@@ -58,6 +71,7 @@ void codelet_func(void *buffers[], void *cl_arg){
     for(int i = 0; i < PyTuple_Size(cst->argList); i++){
         Py_DECREF(PyTuple_GetItem(cst->argList, i));
     }
+    Py_DECREF(cst->argList);
 
     /*restore previous GIL state*/
     PyGILState_Release(state);
@@ -88,9 +102,25 @@ void cb_func(void *v){
     /*deallocate task*/
     free(task->cl);
 	  free(task->cl_arg);
+
 }
 
 /***********************************************************************************/
+/*PyObject*->struct starpu_task**/
+static struct starpu_task *PyTask_AsTask(PyObject* obj){
+  return (struct starpu_task *) PyCapsule_GetPointer(obj, "Task");
+}
+
+/* destructor function for task */
+static void del_Task(PyObject *obj) {
+  struct starpu_task* obj_task=PyTask_AsTask(obj);
+  obj_task->destroy=1; /*XXX we should call starpu task destroy*/
+}
+
+/*struct starpu_task*->PyObject**/
+static PyObject *PyTask_FromTask(struct starpu_task *task) {
+  return PyCapsule_New(task, "Task", del_Task);
+}
 
 /*****************************Wrappers of StarPU methods****************************/
 /*wrapper submit method*/
@@ -100,18 +130,32 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
     PyObject* loop = PyObject_CallMethod(asyncio_module, "get_running_loop", NULL);
     /*create a asyncio.Future object*/
     PyObject* fut = PyObject_CallMethod(loop, "create_future", NULL);
-    /*the python function passed in*/
-    PyObject* func_py;
-    /*the args of function*/
-    PyObject* pArgs;
 
-    /*first argument in args is always the function*/
-    func_py = PyTuple_GetItem(args, 0); 
+    /*first argument in args is always the python function passed in*/
+    PyObject* func_py = PyTuple_GetItem(args, 0);
+    Py_INCREF(func_py);
 
 	  /*allocate a task structure and initialize it with default values*/
     struct starpu_task *task=starpu_task_create();
-
-    // int PyObject_SetAttrString(fut, starpu_task, int(task))
+    task->destroy=0;
+
+    PyObject* PyTask=PyTask_FromTask(task);
+
+    /*set one of fut attribute to the task pointer*/
+    PyObject_SetAttrString(fut, "starpu_task", PyTask);
+    /*check the arguments of python function passed in*/
+    for (int i=1; i < PyTuple_Size(args); i++){
+      PyObject* obj=PyTuple_GetItem(args, i);
+      const char* tp = Py_TYPE(obj)->tp_name;
+      if(strcmp(tp, "_asyncio.Future") == 0){
+        /*if one of arguments is Future, get its corresponding task*/
+        PyObject* fut_task=PyObject_GetAttrString(obj, "starpu_task");
+        /*declare task dependencies between the current task and the corresponding task of Future argument*/
+        starpu_task_declare_deps(task, 1, PyTask_AsTask(fut_task));
+
+        Py_DECREF(fut_task);
+      }
+    }
     
     /*allocate a codelet structure*/
     struct starpu_codelet *func_cl=(struct starpu_codelet*)malloc(sizeof(struct starpu_codelet));
@@ -124,7 +168,7 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
     cst->f = func_py;
     cst->fut = fut;
     cst->lp = loop;
-    Py_INCREF(func_py);
+    
     Py_INCREF(fut);
     Py_INCREF(loop);
 
@@ -145,10 +189,9 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
 
     /*call starpu_task_submit method*/
     int retval=starpu_task_submit(task);
-
     task->callback_func=&cb_func;
 
-    return fut;	
+    return fut;
 }
 
 /*wrapper wait for all method*/

+ 17 - 1
starpupy/tests/starpu_py.py

@@ -82,6 +82,15 @@ def add_deco(a,b,c):
 
 ###############################################################################
 
+#using decorator wrap the function with input
+@starpu.delayed
+def sub_deco(x,a):
+	print ("Example 9:")
+	print ("This is a function with input and output wrapped by the decorator function:")
+	return x-a
+
+###############################################################################
+
 async def main():
 	#submit function "hello"
     fut = starpu.task_submit(hello)
@@ -116,9 +125,16 @@ async def main():
     print("The result of function sub is:", res5)
 
 	#apply starpu.delayed(add_deco)
-    res6 = await add_deco(1,2,3)
+    fut6 = add_deco(1,2,3)
+    res6 = await fut6
     print("The result of function is", res6)
 
+    #apply starpu.delayed(sub_deco)
+    fut7 = sub_deco(fut6, 1)
+    res7 = await fut7
+    print("The first argument of this function is the result of Example 8")
+    print("The result of function is", res7)
+
 asyncio.run(main())