Przeglądaj źródła

starpupy: add prologue_callback_func to adapt to the case that the argument of submitted function is Future

HE Kun 4 lat temu
rodzic
commit
7410112ffa
2 zmienionych plików z 108 dodań i 44 usunięć
  1. 5 5
      starpupy/examples/starpu_py.py
  2. 103 39
      starpupy/src/starpu_task_wrapper.c

+ 5 - 5
starpupy/examples/starpu_py.py

@@ -129,11 +129,11 @@ async def main():
     res6 = await fut6
     print("The result of function is", res6)
 
- #    #apply starpu.delayed(sub_deco)
- #    fut7 = sub_deco(fut6, 1)
- #    res7 = await fut7
- #    print("The first argument of this function is the result of Example 8")
- #    print("The result of function is", res7)
+    #apply starpu.delayed(sub_deco)
+    fut7 = sub_deco(fut6, 1)
+    res7 = await fut7
+    print("The first argument of this function is the result of Example 8")
+    print("The result of function is", res7)
 
 asyncio.run(main())
 

+ 103 - 39
starpupy/src/starpu_task_wrapper.c

@@ -54,14 +54,15 @@ static PyObject *cloudpickle_module; /*cloudpickle module*/
 // 	PyObject *fut; /*asyncio.Future*/
 // 	PyObject *lp; /*asyncio.Eventloop*/
 // };
-static char* starpu_cloudpickle_dumps(PyObject *obj, Py_ssize_t* obj_data_size)
+static char* starpu_cloudpickle_dumps(PyObject *obj, PyObject **obj_bytes, Py_ssize_t* obj_data_size)
 {
+
 	PyObject *dumps = PyObject_GetAttrString(cloudpickle_module, "dumps");
-	PyObject *obj_bytes= PyObject_CallFunctionObjArgs(dumps, obj, NULL);
+	*obj_bytes= PyObject_CallFunctionObjArgs(dumps, obj, NULL);
 
 	char* obj_data;
-    PyBytes_AsStringAndSize(obj_bytes, &obj_data, obj_data_size);
-    
+    PyBytes_AsStringAndSize(*obj_bytes, &obj_data, obj_data_size);
+
 	return obj_data;
 }
 
@@ -72,8 +73,83 @@ static PyObject* starpu_cloudpickle_loads(char* pyString, Py_ssize_t pyString_si
 	PyObject *obj_bytes_str = PyBytes_FromStringAndSize(pyString, pyString_size);
 	PyObject *obj = PyObject_CallFunctionObjArgs(loads, obj_bytes_str, NULL);
 
+	Py_DECREF(obj_bytes_str);
+
 	return obj;
 }
+
+/* prologue_callback_func*/
+void prologue_cb_func(void * cl_arg)
+{
+	PyObject *func_data;
+	Py_ssize_t func_data_size;
+	PyObject *argList;
+	PyObject *fut;
+	PyObject *loop;
+	int sb;
+
+	/*make sure we own the GIL*/
+	PyGILState_STATE state = PyGILState_Ensure();
+
+	struct starpu_task *task = starpu_task_get_current();
+	/*Initialize struct starpu_codelet_unpack_arg_data*/
+	struct starpu_codelet_pack_arg_data data_org;
+	starpu_codelet_unpack_arg_init(&data_org, &task->cl_arg, &task->cl_arg_size);
+
+	/*get func_data and func_data_size*/
+	starpu_codelet_pick_arg(&data_org, &func_data, &func_data_size);
+	/*get argList*/
+	starpu_codelet_unpack_arg(&data_org, &argList, sizeof(argList));
+	/*get fut*/
+	starpu_codelet_unpack_arg(&data_org, &fut, sizeof(fut));
+	/*get loop*/
+	starpu_codelet_unpack_arg(&data_org, &loop, sizeof(loop));
+	/*get sb*/
+	starpu_codelet_unpack_arg(&data_org, &sb, sizeof(sb));
+
+	/*Repack the data*/
+	/*Initialize struct starpu_codelet_pack_arg_data*/
+	struct starpu_codelet_pack_arg_data data;
+	starpu_codelet_pack_arg_init(&data);
+
+	/*repack func_data*/
+	starpu_codelet_pack_arg(&data, func_data, func_data_size);
+
+	/*check if there is Future in argList, if so, get the Future result*/
+	int i;
+	for(i=0; i < PyTuple_Size(argList); i++)
+	{
+		PyObject *obj=PyTuple_GetItem(argList, i);
+		const char* tp = Py_TYPE(obj)->tp_name;
+		if(strcmp(tp, "_asyncio.Future") == 0)
+		{
+			/*if one of arguments is Future, get its result*/
+			PyObject *fut_result = PyObject_CallMethod(obj, "result", NULL);
+			/*replace the Future argument to its result*/
+			PyTuple_SetItem(argList, i, fut_result);
+		}
+	}
+
+	/*use cloudpickle to dump dumps argList*/
+	Py_ssize_t arg_data_size;
+	PyObject* arg_bytes;
+	char* arg_data = starpu_cloudpickle_dumps(argList, &arg_bytes, &arg_data_size);
+    starpu_codelet_pack_arg(&data, arg_data, arg_data_size);
+    Py_DECREF(arg_bytes);
+
+    /*repack fut*/
+	starpu_codelet_pack_arg(&data, &fut, sizeof(fut));
+	/*repack loop*/
+	starpu_codelet_pack_arg(&data, &loop, sizeof(loop));
+	/*repack sb*/
+	starpu_codelet_pack_arg(&data, &sb, sizeof(sb));
+	/*finish repacking data and store the struct in cl_arg*/
+	starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
+
+	/*restore previous GIL state*/
+	PyGILState_Release(state);
+}
+
 /*function passed to starpu_codelet.cpu_func*/
 void starpupy_codelet_func(void *buffers[], void *cl_arg)
 {
@@ -96,7 +172,7 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
     // func_data = (char *)malloc(func_data_size);
     // starpu_codelet_unpack_arg(&data, func_data, func_data_size);
     /*skip func_data_size*/
-    starpu_codelet_unpack_discard_arg(&data);
+    //starpu_codelet_unpack_discard_arg(&data);
     /*get func_data*/
     starpu_codelet_pick_arg(&data, &func_data, &func_data_size);
 	//starpu_codelet_unpack_arg(&data, &func_py, sizeof(func_py));
@@ -105,7 +181,7 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
     // arg_data = (char *)malloc(arg_data_size);
     // starpu_codelet_unpack_arg(&data, arg_data, arg_data_size);
     /*skip arg_data_size*/
-    starpu_codelet_unpack_discard_arg(&data);
+    //starpu_codelet_unpack_discard_arg(&data);
     /*get arg_data*/
     starpu_codelet_pick_arg(&data, &arg_data, &arg_data_size);
 	//starpu_codelet_unpack_arg(&data, &argList, sizeof(argList));
@@ -130,25 +206,6 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
 		exit(1);
 	}
 
-	/*check the arguments of python function passed in*/
-	int i;
-	for(i=0; i < PyTuple_Size(argList); i++)
-	{
-		PyObject *obj = PyTuple_GetItem(argList, i);
-		const char *tp = Py_TYPE(obj)->tp_name;
-		if(strcmp(tp, "_asyncio.Future") == 0)
-		{
-			/*if one of arguments is Future, get its result*/
-			PyObject *fut_result = PyObject_CallMethod(obj, "result", NULL);
-			/*replace the Future argument to its result*/
-			PyTuple_SetItem(argList, i, fut_result);
-		}
-		/*else if (strcmp(tp, "numpy.ndarray")==0)
-		  {
-		  printf("array is %p\n", obj);
-		  }*/
-	}
-
 	/*call the python function get the return value rv*/
 	//PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
 	//cst->rv = pRetVal;
@@ -165,9 +222,11 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
 	else
 	{
 		Py_ssize_t rv_data_size;
-		char* rv_data = starpu_cloudpickle_dumps(rv, &rv_data_size);
+		PyObject *rv_bytes;
+		char* rv_data = starpu_cloudpickle_dumps(rv, &rv_bytes, &rv_data_size);
 		starpu_codelet_pack_arg(&data, &rv_data_size, sizeof(rv_data_size));
 	    starpu_codelet_pack_arg(&data, rv_data, rv_data_size);
+	    Py_DECREF(rv_bytes);
 	}
 	
 	//starpu_codelet_pack_arg(&data, &rv, sizeof(rv));
@@ -198,10 +257,10 @@ void cb_func(void *v)
 	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
 
 	/*skip func_py*/
-	starpu_codelet_unpack_discard_arg(&data);
+	//starpu_codelet_unpack_discard_arg(&data);
 	starpu_codelet_unpack_discard_arg(&data);
 	/*skip argList*/
-	starpu_codelet_unpack_discard_arg(&data);
+	//starpu_codelet_unpack_discard_arg(&data);
 	starpu_codelet_unpack_discard_arg(&data);
 	/*get fut*/
 	starpu_codelet_unpack_arg(&data, &fut, sizeof(fut));
@@ -211,6 +270,7 @@ void cb_func(void *v)
 	starpu_codelet_unpack_discard_arg(&data);
 	/*get rv_data_size*/
 	starpu_codelet_unpack_arg(&data, &rv_data_size, sizeof(rv_data_size));
+	//starpu_codelet_pick_arg(&data, &rv_data, &rv_data_size);
    
 	//starpu_codelet_unpack_arg(&data, &rv, sizeof(rv));
 
@@ -289,10 +349,10 @@ static size_t sizebase (struct starpu_task *task, unsigned nimpl)
 	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
 
 	/*skip func_py*/
-	starpu_codelet_unpack_discard_arg(&data);
+	//starpu_codelet_unpack_discard_arg(&data);
 	starpu_codelet_unpack_discard_arg(&data);
 	/*skip argList*/
-	starpu_codelet_unpack_discard_arg(&data);
+	//starpu_codelet_unpack_discard_arg(&data);
 	starpu_codelet_unpack_discard_arg(&data);
 	/*skip fut*/
 	starpu_codelet_unpack_discard_arg(&data);
@@ -460,19 +520,20 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 			Py_INCREF(PyTuple_GetItem(argList, i));
 		}
 	}
-
-	/*use cloudpickle to dump func_py and argList*/
+    	
+	/*use cloudpickle to dump func_py*/
 	Py_ssize_t func_data_size;
-	char* func_data = starpu_cloudpickle_dumps(func_py, &func_data_size);
-	starpu_codelet_pack_arg(&data, &func_data_size, sizeof(func_data_size));
+	PyObject* func_bytes;
+	char* func_data = starpu_cloudpickle_dumps(func_py, &func_bytes, &func_data_size);
     starpu_codelet_pack_arg(&data, func_data, func_data_size);
+    Py_DECREF(func_bytes);
 	//starpu_codelet_pack_arg(&data, &func_py, sizeof(func_py));
-	Py_ssize_t arg_data_size;
-	char* arg_data = starpu_cloudpickle_dumps(argList, &arg_data_size);
-	starpu_codelet_pack_arg(&data, &arg_data_size, sizeof(arg_data_size));
-    starpu_codelet_pack_arg(&data, arg_data, arg_data_size);
-	//starpu_codelet_pack_arg(&data, &argList, sizeof(argList));
+
+    /*pack argList*/
+	starpu_codelet_pack_arg(&data, &argList, sizeof(argList));
+	/*pack fut*/
 	starpu_codelet_pack_arg(&data, &fut, sizeof(fut));
+	/*pack loop*/
 	starpu_codelet_pack_arg(&data, &loop, sizeof(loop));
 
 	task->cl=func_cl;
@@ -523,10 +584,13 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 	PyObject *PySB = PyDict_GetItemString(dict_option, "sizebase");
 	int sb=PyLong_AsLong(PySB);
 	//printf("pack sizebase is %d\n", sb);
+	/*pack sb*/
 	starpu_codelet_pack_arg(&data, &sb, sizeof(sb));
 
+	/*finish packing data and store the struct in cl_arg*/
 	starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
 
+	task->prologue_callback_func=&prologue_cb_func;
 	task->callback_func=&cb_func;
 
 	/*call starpu_task_submit method*/