Browse Source

starpupy: await the future object manually if it is not finished when we need its result for the next task

HE Kun 4 years ago
parent
commit
5e5fd00fd1
3 changed files with 24 additions and 4 deletions
  1. 2 2
      starpupy/examples/starpu_py.py
  2. 7 1
      starpupy/src/__init__.py
  3. 15 1
      starpupy/src/starpu_task_wrapper.c

+ 2 - 2
starpupy/examples/starpu_py.py

@@ -129,8 +129,8 @@ async def main():
 
 	#apply starpu.delayed(add_deco)
     fut6 = add_deco(1,2,3)
-    res6 = await fut6
-    print("The result of function is", res6)
+    #res6 = await fut6
+    #print("The result of function is", res6)
 
     #apply starpu.delayed(sub_deco)
     fut7 = sub_deco(fut6, 1)

+ 7 - 1
starpupy/src/__init__.py

@@ -18,4 +18,10 @@
 from.starpupy import *
 from .delay import *
 #from . import joblib
-from .intermedia import *
+from .intermedia import *
+
+import asyncio
+
+async def wait_for_fut(fut):
+	return await fut
+

+ 15 - 1
starpupy/src/starpu_task_wrapper.c

@@ -110,6 +110,20 @@ void prologue_cb_func(void *cl_arg)
 		const char* tp = Py_TYPE(obj)->tp_name;
 		if(strcmp(tp, "_asyncio.Future") == 0)
 		{
+			PyObject *done =  PyObject_CallMethod(obj, "done", NULL);
+			/*if the future object is not finished, we will await it for the result*/
+			if (!PyObject_IsTrue(done))
+			{
+				PyObject *pModule = PyImport_ImportModule("starpu");
+				PyObject *pDict = PyModule_GetDict(pModule);
+				/*call the method wait_for_fut to await obj*/
+				PyObject *wait_method=PyDict_GetItemString(pDict, "wait_for_fut");
+				/*call wait_for_fut(obj)*/
+				PyObject *wait_obj = PyObject_CallFunctionObjArgs(wait_method, obj, NULL);
+				/*call obj = asyncio.run_coroutine_threadsafe(wait_for_fut(obj), loop)*/
+				obj = PyObject_CallMethod(asyncio_module, "run_coroutine_threadsafe", "O,O", wait_obj, loop);
+			}
+		    
 			/*if one of arguments is Future, get its result*/
 			PyObject *fut_result = PyObject_CallMethod(obj, "result", NULL);
 			/*replace the Future argument to its result*/
@@ -561,7 +575,7 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 	starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
 
 	task->prologue_callback_func=&prologue_cb_func;
-	task->callback_func=&cb_func;
+	task->epilogue_callback_func=&cb_func;
 
 	/*call starpu_task_submit method*/
 	Py_BEGIN_ALLOW_THREADS