Browse Source

starpupy: add perfmodel option in joblib.parallel function

HE Kun 4 years ago
parent
commit
e0fae99ffb

+ 47 - 5
starpupy/src/starpu/joblib.py

@@ -16,10 +16,26 @@
 from starpu import starpupy
 import asyncio
 import math
+import os
+import pickle
+import json
 
 # get the number of CPUs controlled by StarPU
 n_cpus=starpupy.cpu_worker_get_count()
 
+#class perfmodel
+class Perfmodel(object):
+	def __init__(self, symbol):
+		self.symbol=symbol
+		self.pstruct=starpupy.init_perfmodel(self.symbol)
+
+	def get_struct(self):
+		return self.pstruct
+
+	def __del__(self):
+	#def free_struct(self):
+		starpupy.free_perfmodel(self.pstruct)
+
 # split a list ls into n_block numbers of sub-lists 
 def partition(ls, n_block):
 	if len(ls)>=n_block:
@@ -38,7 +54,18 @@ def partition(ls, n_block):
 		L=[ls[i:i+1] for i in range (len(ls))]
 	return L
 
-def future_generator(g, n_jobs):
+# generate the dictionary which contains the perfmodel symbol and its struct pointer
+dict_perf={}
+def dict_perf_generator(perfsymbol):
+	if dict_perf.get(perfsymbol)==None:
+		p=Perfmodel(perfsymbol)
+		dict_perf[perfsymbol]=p
+	else:
+		p=dict_perf[perfsymbol]
+	return p
+
+def future_generator(g, n_jobs, perfsymbol):
+	p=dict_perf_generator(perfsymbol)
 	# g is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
 	L=list(g)
 	# generate a list of function according to g
@@ -62,21 +89,25 @@ def future_generator(g, n_jobs):
 	# operation in each split list
 	L_fut=[]
 	for i in range(len(L_split)):
-		fut=starpupy.task_submit(lf, L_split[i])
+		fut=starpupy.task_submit(lf, L_split[i], p.get_struct())
 		L_fut.append(fut)
 	return L_fut
 
-def parallel(*, mode, n_jobs=None):
+def parallel(*, mode, n_jobs=None, perfmodel, \
+	         backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
+	         batch_size='auto', temp_folder=None, max_nbytes='1M',\
+	         mmap_mode='r', prefer=None, require=None):
 	# the mode normal, user can call the function directly without using async
 	if mode=="normal":
 		def parallel_normal(g):
 			async def asy_main():
-				L_fut=future_generator(g, n_jobs)
+				L_fut=future_generator(g, n_jobs, perfmodel)
 				res=[]
 				for i in range(len(L_fut)):
 					L_res=await L_fut[i]
 					res.extend(L_res)
 				print(res)
+				#p.free_struct()
 				return res
 			asyncio.run(asy_main())
 			return asy_main
@@ -84,7 +115,7 @@ def parallel(*, mode, n_jobs=None):
 	# the mode future, user needs to use asyncio module and await the Future result in main function
 	elif mode=="future":
 		def parallel_future(g):
-			L_fut=future_generator(g, n_jobs)
+			L_fut=future_generator(g, n_jobs, perfmodel)
 			fut=asyncio.gather(*L_fut)
 			return fut
 		return parallel_future
@@ -93,3 +124,14 @@ def delayed(f):
 	def delayed_func(*args):
 		return f, args
 	return delayed_func
+
+
+######################################################################
+# dump performance model
+def dump_perfmodel(perfmodel):
+	p=dict_perf[perfmodel]
+	starpupy.save_history_based_model(p.get_struct())
+
+# dump function
+def dump(value, filename, compress=0, protocol=None, cache_size=None):
+	pickle.dump(value, filename, protocol)

+ 122 - 12
starpupy/src/starpu/starpu_task_wrapper.c

@@ -80,10 +80,6 @@ void codelet_func(void *buffers[], void *cl_arg){
     PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
     cst->rv=pRetVal;
 
-    for(int i = 0; i < PyTuple_Size(cst->argList); i++){
-        Py_DECREF(PyTuple_GetItem(cst->argList, i));
-    }
-    Py_DECREF(cst->argList);
     //Py_DECREF(cst->f);
 
     /*restore previous GIL state*/
@@ -110,6 +106,19 @@ void cb_func(void *v){
     Py_DECREF(cst->fut);
     Py_DECREF(cst->lp);
 
+    //Py_DECREF(perfmodel);
+    struct starpu_codelet * func_cl=(struct starpu_codelet *) task->cl;
+    if (func_cl->model != NULL){
+      struct starpu_perfmodel *perf =(struct starpu_perfmodel *) func_cl->model;
+      PyObject* perfmodel=PyCapsule_New(perf, "Perf", 0);
+      Py_DECREF(perfmodel);
+    }
+
+    for(int i = 0; i < PyTuple_Size(cst->argList); i++){
+        Py_DECREF(PyTuple_GetItem(cst->argList, i));
+    }
+    Py_DECREF(cst->argList);
+
     /*restore previous GIL state*/
     PyGILState_Release(state);
 
@@ -136,6 +145,82 @@ static PyObject *PyTask_FromTask(struct starpu_task *task) {
   return PyCapsule_New(task, "Task", del_Task);
 }
 
+/***********************************************************************************/
+static size_t sizebase (struct starpu_task * task, unsigned nimpl){
+
+  codelet_st* cst = (codelet_st*) task->cl_arg;
+
+  PyObject* obj=PyTuple_GetItem(cst->argList, 0);
+  /*get the length of arguments*/
+  int n = PyList_Size(obj);
+
+  return n;
+}
+
+static void del_Perf(PyObject *obj){
+  struct starpu_perfmodel *perf=(struct starpu_perfmodel*)PyCapsule_GetPointer(obj, "Perf");
+  free(perf);
+}
+/*initialization of perfmodel*/
+static PyObject* init_perfmodel(PyObject *self, PyObject *args){
+
+  char* sym;
+
+  if (!PyArg_ParseTuple(args, "s", &sym))
+    return NULL;
+
+  /*allocate a perfmodel structure*/
+  struct starpu_perfmodel *perf=(struct starpu_perfmodel*)calloc(1, sizeof(struct starpu_perfmodel));
+
+  /*get the perfmodel symbol*/
+  char* p =strdup(sym);
+  perf->symbol=p;
+  perf->type=STARPU_HISTORY_BASED;
+
+  /*struct perfmodel*->PyObject**/
+  PyObject *perfmodel=PyCapsule_New(perf, "Perf", NULL);
+
+  return perfmodel;
+}
+
+
+/*free perfmodel*/
+static PyObject* free_perfmodel(PyObject *self, PyObject *args){
+
+  PyObject* perfmodel;
+  if (!PyArg_ParseTuple(args, "O", &perfmodel))
+    return NULL;
+
+  /*PyObject*->struct perfmodel**/
+  struct starpu_perfmodel *perf=PyCapsule_GetPointer(perfmodel, "Perf");
+
+  starpu_save_history_based_model(perf);
+  //starpu_perfmodel_unload_model(perf);
+  free(perf->symbol);
+  starpu_perfmodel_deinit(perf);
+  free(perf);
+
+  /*return type is void*/
+  Py_INCREF(Py_None);
+  return Py_None;
+}
+
+static PyObject* starpu_save_history_based_model_wrapper(PyObject *self, PyObject *args){
+
+  PyObject* perfmodel;
+  if (!PyArg_ParseTuple(args, "O", &perfmodel))
+    return NULL;
+
+  /*PyObject*->struct perfmodel**/
+  struct starpu_perfmodel *perf=PyCapsule_GetPointer(perfmodel, "Perf");
+
+  starpu_save_history_based_model(perf);
+
+  /*return type is void*/
+  Py_INCREF(Py_None);
+  return Py_None;
+}
+
 /*****************************Wrappers of StarPU methods****************************/
 /*wrapper submit method*/
 static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
@@ -176,7 +261,17 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
     /*initialize func_cl with default values*/
     starpu_codelet_init(func_cl);
     func_cl->cpu_func=&codelet_func;
-    //func_cl->model=p; p malloc perfmode
+    
+    /*check whether the last argument in args is the perfmodel*/
+    PyObject* perfmodel=PyTuple_GetItem(args, PyTuple_Size(args)-1);
+    const char* tp_perf = Py_TYPE(perfmodel)->tp_name;
+    if (strcmp(tp_perf, "PyCapsule")==0){
+      /*PyObject*->struct perfmodel**/
+      struct starpu_perfmodel *perf=PyCapsule_GetPointer(perfmodel, "Perf");
+      func_cl->model=perf;
+      Py_INCREF(perfmodel);
+    }
+    
 
     /*allocate a new codelet structure to pass the python function, asyncio.Future and Event loop*/
     codelet_st *cst = (codelet_st*)malloc(sizeof(codelet_st));
@@ -188,15 +283,23 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
     Py_INCREF(loop);
 
     /*pass args in argList*/
-    if (PyTuple_Size(args)==1)
+    if (PyTuple_Size(args)==1 || (PyTuple_Size(args)==2 && strcmp(tp_perf, "PyCapsule")==0))/*function no arguments*/
       cst->argList = PyTuple_New(0);
-    else{
+    else if(PyTuple_Size(args)>2 && strcmp(tp_perf, "PyCapsule")==0){/*function has arguments and the last argument in args is the perfmodel*/
+      cst->argList = PyTuple_New(PyTuple_Size(args)-2);
+      for (int i=0; i < PyTuple_Size(args)-2; i++){
+        PyObject* tmp=PyTuple_GetItem(args, i+1);
+        PyTuple_SetItem(cst->argList, i, tmp);
+        Py_INCREF(PyTuple_GetItem(cst->argList, i));
+      }
+    }
+    else{/*function has arguments and no perfmodel*/
       cst->argList = PyTuple_New(PyTuple_Size(args)-1);
       for (int i=0; i < PyTuple_Size(args)-1; i++){
         PyObject* tmp=PyTuple_GetItem(args, i+1);
         PyTuple_SetItem(cst->argList, i, tmp);
         Py_INCREF(PyTuple_GetItem(cst->argList, i));
-     }
+      }
     }
 
     task->cl=func_cl;
@@ -204,6 +307,10 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
     /*call starpu_task_submit method*/
     starpu_task_submit(task);
     task->callback_func=&cb_func;
+    if (strcmp(tp_perf, "PyCapsule")==0){
+      struct starpu_perfmodel *perf =(struct starpu_perfmodel *) func_cl->model;
+      perf->size_base=&sizebase;
+    }
 
     //printf("the number of reference is %ld\n", Py_REFCNT(func_py));
     //_Py_PrintReferences(stderr);
@@ -222,7 +329,7 @@ static PyObject* starpu_task_wait_for_all_wrapper(PyObject *self, PyObject *args
 
 	/*return type is void*/
 	Py_INCREF(Py_None);
-    return Py_None;
+  return Py_None;
 }
 
 /*wrapper pause method*/
@@ -233,7 +340,7 @@ static PyObject* starpu_pause_wrapper(PyObject *self, PyObject *args){
 
 	/*return type is void*/
 	Py_INCREF(Py_None);
-    return Py_None;
+  return Py_None;
 }
 
 /*wrapper resume method*/
@@ -244,7 +351,7 @@ static PyObject* starpu_resume_wrapper(PyObject *self, PyObject *args){
 
 	/*return type is void*/
 	Py_INCREF(Py_None);
-    return Py_None;
+  return Py_None;
 }
 
 /*wrapper get count cpu method*/
@@ -268,11 +375,14 @@ static PyMethodDef starpupyMethods[] =
   {"pause", starpu_pause_wrapper, METH_VARARGS, "suspend the processing of new tasks by workers"}, /*pause method*/
   {"resume", starpu_resume_wrapper, METH_VARARGS, "resume the workers polling for new tasks"}, /*resume method*/
   {"cpu_worker_get_count", starpu_cpu_worker_get_count_wrapper, METH_VARARGS, "return the number of CPUs controlled by StarPU"}, /*get count cpu method*/
+  {"init_perfmodel", init_perfmodel, METH_VARARGS, "initialize struct starpu_perfmodel"}, /*initialize perfmodel*/
+  {"free_perfmodel", free_perfmodel, METH_VARARGS, "free struct starpu_perfmodel"}, /*free perfmodel*/
+  {"save_history_based_model", starpu_save_history_based_model_wrapper, METH_VARARGS, "save the performance model"}, /*save the performance model*/
   {NULL, NULL}
 };
 
 /*deallocation function*/
-static void starpupyFree(void *v){
+static void starpupyFree(void* self){
 	starpu_shutdown();
   Py_DECREF(asyncio_module);
   //COUNTREFS();

+ 1 - 1
starpupy/tests/Makefile

@@ -2,5 +2,5 @@ PYTHON ?= python3
 
 all:
 	PYTHONPATH=../src $(PYTHON) starpu_py.py
-	PYTHONPATH=../src $(PYTHON) starpu_py_parallel.py
+	PYTHONPATH=../src STARPU_CALIBRATE=1 $(PYTHON) starpu_py_parallel.py
 

+ 22 - 7
starpupy/tests/starpu_py_parallel.py

@@ -17,6 +17,7 @@ import starpu
 import time
 import asyncio
 from math import sqrt
+from math import log10
 
 #generate a list to store functions
 g_func=[]
@@ -59,27 +60,41 @@ def sub(a,b,c):
 	return res_sub1, res_sub2
 g_func.append(starpu.joblib.delayed(sub)(6, 2, 5.9))
 
+#the size of generator
+N=1000
 print("************************")
 print("parallel Normal version:")
 print("************************")
-print("--input is iterable argument list")
-starpu.joblib.parallel(mode="normal", n_jobs=-2)(starpu.joblib.delayed(sqrt)(i**2)for i in range(10))
+print("--input is iterable argument list, example 1")
+starpu.joblib.parallel(mode="normal", n_jobs=-2, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
+
+print("--input is iterable argument list, example 2")
+starpu.joblib.parallel(mode="normal", n_jobs=-2, perfmodel="second")(starpu.joblib.delayed(log10)(10**i)for i in range(N))
 
 print("--input is iterable function list")
-starpu.joblib.parallel(mode="normal", n_jobs=3)(g_func)
+starpu.joblib.parallel(mode="normal", n_jobs=3, perfmodel="third")(g_func)
 
 
 print("************************")
 print("parallel Future version:")
 print("************************")
 async def main():
-	print("--input is iterable argument list")
-	fut1=starpu.joblib.parallel(mode="future", n_jobs=-3)(starpu.joblib.delayed(sqrt)(i**2)for i in range(10))
+	print("--input is iterable argument list, example 1")
+	fut1=starpu.joblib.parallel(mode="future", n_jobs=-3, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
 	res1=await fut1
 	print(res1)
 
-	print("--input is iterable function list")
-	fut2=starpu.joblib.parallel(mode="future", n_jobs=2)(g_func)
+	print("--input is iterable argument list, example 2")
+	fut2=starpu.joblib.parallel(mode="future", n_jobs=-3, perfmodel="second")(starpu.joblib.delayed(log10)(10**i)for i in range(N))
 	res2=await fut2
 	print(res2)
+
+	print("--input is iterable function list")
+	fut3=starpu.joblib.parallel(mode="future", n_jobs=2, perfmodel="third")(g_func)
+	res3=await fut3
+	print(res3)
 asyncio.run(main())
+
+starpu.joblib.dump_perfmodel(perfmodel="first")
+starpu.joblib.dump_perfmodel(perfmodel="second")
+starpu.joblib.dump_perfmodel(perfmodel="third")