Browse Source

starpupy: adapt to MPI master/slave model

HE Kun 4 years ago
parent
commit
b443c57d60

+ 9 - 2
starpupy/examples/starpu_py.py

@@ -13,10 +13,13 @@
 #
 # See the GNU Lesser General Public License in COPYING.LGPL for more details.
 #
+from math import sqrt
 import starpu
+from starpu import starpupy
 import time
 import asyncio
 
+
 ############################################################################
 #function no input no output print hello world
 def hello():
@@ -120,7 +123,7 @@ async def main():
     print("The result of function add is :", res4)
 
 	#submit function "sub" but only provide function name
-    fut5 = starpu.task_submit()("sub", 6, 2, 5.9)
+    fut5 = starpu.task_submit()(sub, 6, 2, 5.9)
     res5 = await fut5
     print("The result of function sub is:", res5)
 
@@ -135,7 +138,11 @@ async def main():
     print("The first argument of this function is the result of Example 8")
     print("The result of function is", res7)
 
-asyncio.run(main())
+    fut8 = starpu.task_submit()("sqrt", 4)
+    res8 = await fut8
+    print("The result of function sqrt is:", res8)
 
+asyncio.run(main())
 
+starpupy.shutdown()
 #starpu.task_wait_for_all()

+ 2 - 2
starpupy/examples/starpu_py_np.py

@@ -14,6 +14,7 @@
 # See the GNU Lesser General Public License in COPYING.LGPL for more details.
 #
 import starpu
+from starpu import starpupy
 import asyncio
 import numpy as np
 
@@ -36,5 +37,4 @@ async def main():
 
 asyncio.run(main())
 
-
-#starpu.task_wait_for_all()
+starpupy.shutdown()

+ 3 - 0
starpupy/examples/starpu_py_parallel.py

@@ -15,6 +15,7 @@
 #
 import starpu
 import starpu.joblib
+from starpu import starpupy
 import time
 import asyncio
 from math import sqrt
@@ -348,3 +349,5 @@ starpu.perfmodel_plot(perfmodel="func",view=displayPlot)
 
 starpu.perfmodel_plot(perfmodel="log_list",view=displayPlot)
 starpu.perfmodel_plot(perfmodel="log_arr",view=displayPlot)
+
+starpupy.shutdown()

+ 50 - 76
starpupy/src/starpu_task_wrapper.c

@@ -43,20 +43,10 @@ extern void _Py_CountReferences(FILE*);
 /*********************Functions passed in task_submit wrapper***********************/
 
 static PyObject *asyncio_module; /*python asyncio module*/
-static PyObject *cloudpickle_module; /*cloudpickle module*/
-
-/*structure contains parameters which are passed to starpu_task.cl_arg*/
-// struct codelet_args
-// {
-// 	PyObject *f; /*the python function passed in*/
-// 	PyObject *argList; /*argument list of python function passed in*/
-// 	PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
-// 	PyObject *fut; /*asyncio.Future*/
-// 	PyObject *lp; /*asyncio.Eventloop*/
-// };
+
 static char* starpu_cloudpickle_dumps(PyObject *obj, PyObject **obj_bytes, Py_ssize_t *obj_data_size)
 {
-	
+	PyObject *cloudpickle_module = PyImport_ImportModule("cloudpickle");
 	PyObject *dumps = PyObject_GetAttrString(cloudpickle_module, "dumps");
 	*obj_bytes= PyObject_CallFunctionObjArgs(dumps, obj, NULL);
 
@@ -68,7 +58,7 @@ static char* starpu_cloudpickle_dumps(PyObject *obj, PyObject **obj_bytes, Py_ss
 
 static PyObject* starpu_cloudpickle_loads(char* pyString, Py_ssize_t pyString_size)
 {
-
+	PyObject *cloudpickle_module = PyImport_ImportModule("cloudpickle");
 	PyObject *loads = PyObject_GetAttrString(cloudpickle_module, "loads");
 	PyObject *obj_bytes_str = PyBytes_FromStringAndSize(pyString, pyString_size);
 	PyObject *obj = PyObject_CallFunctionObjArgs(loads, obj_bytes_str, NULL);
@@ -96,7 +86,7 @@ void prologue_cb_func(void *cl_arg)
 	struct starpu_codelet_pack_arg_data data_org;
 	starpu_codelet_unpack_arg_init(&data_org, &task->cl_arg, &task->cl_arg_size);
 
-	/*get func_data and func_data_size*/
+	/*get func_py char**/
 	starpu_codelet_pick_arg(&data_org, &func_data, &func_data_size);
 	/*get argList*/
 	starpu_codelet_unpack_arg(&data_org, &argList, sizeof(argList));
@@ -136,7 +126,6 @@ void prologue_cb_func(void *cl_arg)
 	char* arg_data = starpu_cloudpickle_dumps(argList, &arg_bytes, &arg_data_size);
     starpu_codelet_pack_arg(&data, arg_data, arg_data_size);
     Py_DECREF(arg_bytes);
-
     /*repack fut*/
 	starpu_codelet_pack_arg(&data, &fut, sizeof(fut));
 	/*repack loop*/
@@ -160,6 +149,9 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
     Py_ssize_t arg_data_size;
 	PyObject *argList; /*argument list of python function passed in*/
 
+	/*make sure we own the GIL*/
+	PyGILState_STATE state = PyGILState_Ensure();
+
 	//struct codelet_args *cst = (struct codelet_args*) cl_arg;
 
 	struct starpu_task *task = starpu_task_get_current();
@@ -167,24 +159,10 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
 	struct starpu_codelet_pack_arg_data data;
 	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
 
-	/*get func_py char*/
-	// starpu_codelet_unpack_arg(&data, &func_data_size, sizeof(func_data_size));
-    // func_data = (char *)malloc(func_data_size);
-    // starpu_codelet_unpack_arg(&data, func_data, func_data_size);
-    /*skip func_data_size*/
-    //starpu_codelet_unpack_discard_arg(&data);
-    /*get func_data*/
+	/*get func_py char**/
     starpu_codelet_pick_arg(&data, &func_data, &func_data_size);
-	//starpu_codelet_unpack_arg(&data, &func_py, sizeof(func_py));
-	/*get argList char*/
-	// starpu_codelet_unpack_arg(&data, &arg_data_size, sizeof(arg_data_size));
-    // arg_data = (char *)malloc(arg_data_size);
-    // starpu_codelet_unpack_arg(&data, arg_data, arg_data_size);
-    /*skip arg_data_size*/
-    //starpu_codelet_unpack_discard_arg(&data);
-    /*get arg_data*/
+	/*get argList char**/
     starpu_codelet_pick_arg(&data, &arg_data, &arg_data_size);
-	//starpu_codelet_unpack_arg(&data, &argList, sizeof(argList));
 	/*skip fut*/
 	starpu_codelet_unpack_discard_arg(&data);
 	/*skip loop*/
@@ -192,9 +170,6 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
 	/*skip sb*/
 	starpu_codelet_unpack_discard_arg(&data);
 
-	/*make sure we own the GIL*/
-	PyGILState_STATE state = PyGILState_Ensure();
-
 	/*use cloudpickle to load function (maybe only function name)*/
 	PyObject *pFunc=starpu_cloudpickle_loads(func_data, func_data_size);
 	
@@ -226,16 +201,19 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
 	}
 
 	/*call the python function get the return value rv*/
-	//PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
-	//cst->rv = pRetVal;
 	PyObject *rv = PyObject_CallObject(func_py, argList);
+
+	/*Initialize struct starpu_codelet_pack_arg_data for return value*/
+	struct starpu_codelet_pack_arg_data data_ret;
+	starpu_codelet_pack_arg_init(&data_ret);
+
 	/*if the result is None type, pack NULL without using cloudpickle*/
 	if (rv==Py_None)
 	{
 		char* rv_data=NULL;
 		Py_ssize_t rv_data_size=0;
-		starpu_codelet_pack_arg(&data, &rv_data_size, sizeof(rv_data_size));
-	    starpu_codelet_pack_arg(&data, &rv_data, sizeof(rv_data));
+		starpu_codelet_pack_arg(&data_ret, &rv_data_size, sizeof(rv_data_size));
+	    starpu_codelet_pack_arg(&data_ret, &rv_data, sizeof(rv_data));
 	}
 	/*else use cloudpickle to dump rv*/
 	else
@@ -243,15 +221,13 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
 		Py_ssize_t rv_data_size;
 		PyObject *rv_bytes;
 		char* rv_data = starpu_cloudpickle_dumps(rv, &rv_bytes, &rv_data_size);
-		starpu_codelet_pack_arg(&data, &rv_data_size, sizeof(rv_data_size));
-	    starpu_codelet_pack_arg(&data, rv_data, rv_data_size);
+		starpu_codelet_pack_arg(&data_ret, &rv_data_size, sizeof(rv_data_size));
+	    starpu_codelet_pack_arg(&data_ret, rv_data, rv_data_size);
 	    Py_DECREF(rv_bytes);
 	}
 	
-	//starpu_codelet_pack_arg(&data, &rv, sizeof(rv));
-    starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
-
-	//Py_DECREF(cst->f);
+    /*store the return value in task_>cl_ret*/
+    starpu_codelet_pack_arg_fini(&data_ret, &task->cl_ret, &task->cl_ret_size);
 
     Py_DECREF(func_py);
     Py_DECREF(argList);
@@ -269,17 +245,18 @@ void cb_func(void *v)
     Py_ssize_t rv_data_size;
 	PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
 
+    /*make sure we own the GIL*/
+	PyGILState_STATE state = PyGILState_Ensure();
+
 	struct starpu_task *task = starpu_task_get_current();
-	//struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
-	/*Initialize struct starpu_codelet_unpack_arg_data*/
+
+	/*Initialize struct starpu_codelet_unpack_arg_data data*/
 	struct starpu_codelet_pack_arg_data data;
 	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
 
 	/*skip func_py*/
-	//starpu_codelet_unpack_discard_arg(&data);
 	starpu_codelet_unpack_discard_arg(&data);
 	/*skip argList*/
-	//starpu_codelet_unpack_discard_arg(&data);
 	starpu_codelet_unpack_discard_arg(&data);
 	/*get fut*/
 	starpu_codelet_unpack_arg(&data, &fut, sizeof(fut));
@@ -287,26 +264,23 @@ void cb_func(void *v)
 	starpu_codelet_unpack_arg(&data, &loop, sizeof(loop));
 	/*skip sb*/
 	starpu_codelet_unpack_discard_arg(&data);
-	/*get rv_data_size*/
-	starpu_codelet_unpack_arg(&data, &rv_data_size, sizeof(rv_data_size));
-	//starpu_codelet_pick_arg(&data, &rv_data, &rv_data_size);
-   
-	//starpu_codelet_unpack_arg(&data, &rv, sizeof(rv));
 
-	/*make sure we own the GIL*/
-	PyGILState_STATE state = PyGILState_Ensure();
+	/*Initialize struct starpu_codelet_unpack_arg_data data*/
+	struct starpu_codelet_pack_arg_data data_ret;
+	starpu_codelet_unpack_arg_init(&data_ret, &task->cl_ret, &task->cl_ret_size);
+	/*get rv_data_size*/
+	starpu_codelet_unpack_arg(&data_ret, &rv_data_size, sizeof(rv_data_size));
 
 	/*if the rv_data_size is 0, the result is None type*/
 	if (rv_data_size==0)
 	{
+		starpu_codelet_unpack_discard_arg(&data_ret);
 		rv=Py_None;
 	}
 	/*else use cloudpickle to load rv*/
 	else
 	{
-		// rv_data = (char *)malloc(rv_data_size);
-        // starpu_codelet_unpack_arg(&data, rv_data, rv_data_size);
-		starpu_codelet_pick_arg(&data, &rv_data, &rv_data_size);
+		starpu_codelet_pick_arg(&data_ret, &rv_data, &rv_data_size);
 		rv=starpu_cloudpickle_loads(rv_data, rv_data_size);
 	}
 
@@ -319,9 +293,7 @@ void cb_func(void *v)
 	Py_DECREF(rv);
 	Py_DECREF(fut);
 	Py_DECREF(loop);
-	//Py_DECREF(argList);
 
-	//Py_DECREF(perfmodel);
 	struct starpu_codelet *func_cl=(struct starpu_codelet *) task->cl;
 	if (func_cl->model != NULL)
 	{
@@ -362,7 +334,7 @@ static PyObject *PyTask_FromTask(struct starpu_task *task)
 static size_t sizebase (struct starpu_task *task, unsigned nimpl)
 {
 	int sb;
-	//struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
+
 	/*Initialize struct starpu_codelet_unpack_arg_data*/
 	struct starpu_codelet_pack_arg_data data;
 	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
@@ -379,10 +351,6 @@ static size_t sizebase (struct starpu_task *task, unsigned nimpl)
 	starpu_codelet_unpack_discard_arg(&data);
 	/*get sb*/
 	starpu_codelet_unpack_arg(&data, &sb, sizeof(sb));
-	/*skip rv*/
-	starpu_codelet_unpack_discard_arg(&data);
-	starpu_codelet_unpack_discard_arg(&data);
-	//starpu_codelet_unpack_args(task_submit->cl_arg, &func_py, &argList, &fut, &loop, &sb, &rv);
 
 	return sb;
 }
@@ -512,12 +480,6 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 		Py_INCREF(perfmodel);
 	}
 
-	/*allocate a new codelet structure to pass the python function, asyncio.Future and Event loop*/
-	//struct codelet_args *cst = (struct codelet_args*)malloc(sizeof(struct codelet_args));
-	//cst->f = func_py;
-	//cst->fut = fut;
-	//cst->lp = loop;
-
 	/*Initialize struct starpu_codelet_pack_arg_data*/
 	struct starpu_codelet_pack_arg_data data;
 	starpu_codelet_pack_arg_init(&data);
@@ -539,14 +501,13 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 			Py_INCREF(PyTuple_GetItem(argList, i));
 		}
 	}
-    	
+
 	/*use cloudpickle to dump func_py*/
 	Py_ssize_t func_data_size;
 	PyObject *func_bytes;
 	char* func_data = starpu_cloudpickle_dumps(func_py, &func_bytes, &func_data_size);
     starpu_codelet_pack_arg(&data, func_data, func_data_size);
     Py_DECREF(func_bytes);
-	//starpu_codelet_pack_arg(&data, &func_py, sizeof(func_py));
     /*pack argList*/
 	starpu_codelet_pack_arg(&data, &argList, sizeof(argList));
 	/*pack fut*/
@@ -555,7 +516,6 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 	starpu_codelet_pack_arg(&data, &loop, sizeof(loop));
 
 	task->cl=func_cl;
-	//task->cl_arg=cst;
 
 	/*pass optional values name=None, synchronous=1, priority=0, color=None, flops=None, perfmodel=None, sizebase=0*/
 	/*const char * name*/
@@ -703,6 +663,17 @@ static PyObject* starpu_task_nsubmitted_wrapper(PyObject *self, PyObject *args)
 	/*Return the number of submitted tasks which have not completed yet */
 	return Py_BuildValue("i", num_task);
 }
+
+/*wrapper shutdown method*/
+static PyObject* starpu_shutdown_wrapper(PyObject *self, PyObject *args)
+{
+	/*call starpu_shutdown method*/
+	starpu_shutdown();
+
+	/*return type is void*/
+	Py_INCREF(Py_None);
+	return Py_None;
+}
 /***********************************************************************************/
 
 /***************The module’s method table and initialization function**************/
@@ -720,6 +691,7 @@ static PyMethodDef starpupyMethods[] =
 	{"sched_get_min_priority", starpu_sched_get_min_priority_wrapper, METH_VARARGS, "get the number of min priority"}, /*get the number of min priority*/
 	{"sched_get_max_priority", starpu_sched_get_max_priority_wrapper, METH_VARARGS, "get the number of max priority"}, /*get the number of max priority*/
 	{"task_nsubmitted", starpu_task_nsubmitted_wrapper, METH_VARARGS, "get the number of submitted tasks which have not completed yet"}, /*get the number of submitted tasks which have not completed yet*/
+	{"shutdown", starpu_shutdown_wrapper, METH_VARARGS, "shutdown starpu"}, /*shutdown starpu*/
 	{NULL, NULL}
 };
 
@@ -750,13 +722,15 @@ PyMODINIT_FUNC
 PyInit_starpupy(void)
 {
 	PyEval_InitThreads();
+	//PyThreadState* st = PyEval_SaveThread();
+	Py_BEGIN_ALLOW_THREADS
 	/*starpu initialization*/
 	int ret = starpu_init(NULL);
 	assert(ret==0);
+	Py_END_ALLOW_THREADS
+	
 	/*python asysncio import*/
 	asyncio_module = PyImport_ImportModule("asyncio");
-	/*cloudpickle import*/
-	cloudpickle_module = PyImport_ImportModule("cloudpickle");
 
 #ifdef STARPU_PYTHON_HAVE_NUMPY
 	/*numpy import array*/