瀏覽代碼

starpupy: add numpy array as submitted task input, add some useful Joblib functions/classes

HE Kun 4 年之前
父節點
當前提交
dc6ae967e2
共有 4 個文件被更改,包括 204 次插入65 次删除
  1. 10 0
      starpupy/examples/starpu_py.py
  2. 51 6
      starpupy/examples/starpu_py_parallel.py
  3. 126 46
      starpupy/src/joblib.py
  4. 17 13
      starpupy/src/starpu_task_wrapper.c

+ 10 - 0
starpupy/examples/starpu_py.py

@@ -16,6 +16,7 @@
 import starpu
 import time
 import asyncio
+import numpy as np
 
 ############################################################################
 #function no input no output print hello world
@@ -91,6 +92,11 @@ def sub_deco(x,a):
 
 ###############################################################################
 
+def scal(a, t):
+    return t*a
+
+t=np.array([1,2,3,4,5,6,7,8,9,10])
+
 async def main():
 	#submit function "hello"
     fut = starpu.task_submit()(hello)
@@ -135,6 +141,10 @@ async def main():
     print("The first argument of this function is the result of Example 8")
     print("The result of function is", res7)
 
+    fut8 = starpu.task_submit()(scal, 2, t)
+    res8 = await fut8
+    print("The result of Example 10 is", res8)
+
 asyncio.run(main())
 
 

+ 51 - 6
starpupy/examples/starpu_py_parallel.py

@@ -18,6 +18,7 @@ import time
 import asyncio
 from math import sqrt
 from math import log10
+import numpy as np
 
 #generate a list to store functions
 g_func=[]
@@ -61,7 +62,21 @@ def sub(a,b,c):
 g_func.append(starpu.joblib.delayed(sub)(6, 2, 5.9))
 
 #the size of generator
-N=1000000
+N=100
+# a=np.array([1,2,3,4,5,6,7,8,9,10])
+# print(type(a))
+
+def scal(a, t):
+    return t*a
+
+A=np.arange(N)
+
+
+#starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="log")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
+# for x in [10, 100, 1000, 10000, 100000, 1000000]:
+# 	for X2 in range(x, x*10, x):
+# 		starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="log")(starpu.joblib.delayed(log10)(i+1)for i in range(X2))
+# 		print(range(X2))
 
 print("************************")
 print("parallel Normal version:")
@@ -69,12 +84,34 @@ print("************************")
 print("--input is iterable argument list, example 1")
 starpu.joblib.Parallel(mode="normal", n_jobs=-2, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
 
-print("--input is iterable argument list, example 2")
-starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="second")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
+#################scikit test###################
+DEFAULT_JOBLIB_BACKEND = starpu.joblib.get_active_backend()[0].__class__
+class MyBackend(DEFAULT_JOBLIB_BACKEND):  # type: ignore
+    def __init__(self, *args, **kwargs):
+        self.count = 0
+        super().__init__(*args, **kwargs)
+
+    def start_call(self):
+        self.count += 1
+        return super().start_call()
+
+
+starpu.joblib.register_parallel_backend('testing', MyBackend)
+
+with starpu.joblib.parallel_backend("testing") as (ba, n_jobs):
+	print("backend and n_jobs is", ba, n_jobs)
+###############################################
+
+print("--input is iterable argument list, example 2, with multi input")
+a=np.arange(10)
+b=np.arange(10)
+starpu.joblib.Parallel(mode="normal", n_jobs=-2, perfmodel="first")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
 
 print("--input is iterable function list")
 starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="third")(g_func)
 
+print("--input is numpy array")
+starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="array")(starpu.joblib.delayed(scal)(2,A))
 
 print("************************")
 print("parallel Future version:")
@@ -85,15 +122,23 @@ async def main():
 	res1=await fut1
 	#print(res1)
 
-	print("--input is iterable argument list, example 2")
-	fut2=starpu.joblib.Parallel(mode="future", n_jobs=-3, perfmodel="second")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
+	print("--input is iterable argument list, example 2, with multi input")
+	a=np.arange(10)
+	b=np.arange(10)
+	fut2=starpu.joblib.Parallel(mode="future", n_jobs=-3, perfmodel="second")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
 	res2=await fut2
 	#print(res2)
 
 	print("--input is iterable function list")
-	fut3=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="third")(g_func)
+	fut3=starpu.joblib.Parallel(mode="future", n_jobs=2)(g_func)
 	res3=await fut3
 	#print(res3)
+
+	print("--input is numpy array")
+	fut4=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="array")(starpu.joblib.delayed(scal)(2,A))
+	res4=await fut4
+	#print(res4)
+
 asyncio.run(main())
 
 starpu.perfmodel_plot(perfmodel="first",view=False)

+ 126 - 46
starpupy/src/joblib.py

@@ -15,10 +15,20 @@
 #
 from starpu import starpupy
 import starpu
+import sys
+try:
+   sys.path.remove('/usr/local/lib/python3.8/site-packages/starpu')
+except:
+   pass
 import joblib
 import asyncio
 import math
 import functools
+import numpy as np
+import threading
+
+BACKENDS={}
+_backend = threading.local()
 
 # get the number of CPUs controlled by StarPU
 def cpu_count():
@@ -43,20 +53,11 @@ def partition(ls, n_block):
 		L=[ls[i:i+1] for i in range (len(ls))]
 	return L
 
-def future_generator(g, n_jobs, dict_task):
-	# g is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
-	L=list(g)
-	# generate a list of function according to g
-	def lf(ls):
-		L_func=[]
-		for i in range(len(ls)):
-			# the first element is the function
-			f=ls[i][0]
-			# the second element is the args list of a type tuple
-			L_args=list(ls[i][1])
-			# generate a list of function
-			L_func.append(f(*L_args))
-		return L_func
+def future_generator(iterable, n_jobs, dict_task):
+	# iterable is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
+	print("iterable type is ", type(iterable))
+	print("iterable is", iterable)
+
 	# get the number of block
 	if n_jobs<-cpu_count()-1 or n_jobs>cpu_count():
 		print("Error: n_jobs is out of range, number of CPUs is", cpu_count())
@@ -64,26 +65,70 @@ def future_generator(g, n_jobs, dict_task):
 		n_block=cpu_count()+1+n_jobs
 	else:
 		n_block=n_jobs
-	# generate the split function list
-	L_split=partition(L,n_block)
-	# operation in each split list
-	L_fut=[]
-	for i in range(len(L_split)):
-		fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
-							   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
-			                  (lf, L_split[i])
-		L_fut.append(fut)
-	return L_fut
-
-class Parallel(joblib.Parallel):
+
+	# if arguments are not passed by a generator
+	if type(iterable) is tuple:
+		L_fut=[]
+		for i in range(n_block):
+			L_args=[]
+			for j in range(len(iterable[1])):
+				if type(iterable[1][j]) is np.ndarray:
+					arr_split=np.array_split(iterable[1][j],n_block)
+					L_args.append(arr_split[i])
+				else:
+					L_args.append(iterable[1][j])
+			print("L_args is", L_args)
+			fut=starpu.task_submit()(iterable[0], *L_args)
+			L_fut.append(fut)
+		return L_fut
+
+	# if iterable is a generator or a list of function
+	#elif isinstance(iterable,generator):
+	else:
+		L=list(iterable)
+		#print(L)
+		# generate a list of function according to iterable
+		def lf(ls):
+			L_func=[]
+			for i in range(len(ls)):
+				# the first element is the function
+				f=ls[i][0]
+				# the second element is the args list of a type tuple
+				L_args=list(ls[i][1])
+				# generate a list of function
+				L_func.append(f(*L_args))
+			return L_func
+
+		# generate the split function list
+		L_split=partition(L,n_block)
+		# operation in each split list
+		L_fut=[]
+		for i in range(len(L_split)):
+			fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
+								   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
+				                  (lf, L_split[i])
+			L_fut.append(fut)
+		return L_fut
+
+class Parallel(object):
 	def __init__(self, mode="normal", perfmodel=None, end_msg=None,\
 			 name=None, synchronous=0, priority=0, color=None, flops=None,\
 	         n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
 	         batch_size='auto', temp_folder=None, max_nbytes='1M',\
 	         mmap_mode='r', prefer=None, require=None):
-		super(Parallel, self).__init__(n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
-	         batch_size='auto', temp_folder=None, max_nbytes='1M',\
-	         mmap_mode='r', prefer=None, require=None)
+		active_backend, context_n_jobs = get_active_backend(prefer=prefer, require=require, verbose=verbose)
+		nesting_level = active_backend.nesting_level
+
+		if backend is None:
+			backend = active_backend
+
+		else:
+			try:
+				backend_factory = BACKENDS[backend]
+			except KeyError as e:
+				raise ValueError("Invalid backend: %s, expected one of %r"
+                                 % (backend, sorted(BACKENDS.keys()))) from e
+			backend = backend_factory(nesting_level=nesting_level)
 
 		self.mode=mode
 		self.perfmodel=perfmodel
@@ -94,36 +139,40 @@ class Parallel(joblib.Parallel):
 		self.color=color
 		self.flops=flops
 		self.n_jobs=n_jobs
+		self._backend=backend
+
+	def print_progress(self):
+		pass
 
 	def __call__(self,iterable):
 		#generate the dictionary of task_submit
 		dict_task={'name': self.name, 'synchronous': self.synchronous, 'priority': self.priority, 'color': self.color, 'flops': self.flops, 'perfmodel': self.perfmodel}
+		if hasattr(self._backend, 'start_call'):
+			self._backend.start_call()
 		# the mode normal, user can call the function directly without using async
 		if self.mode=="normal":
-			#def parallel_normal(g):
 			async def asy_main():
 				L_fut=future_generator(iterable, self.n_jobs, dict_task)
 				res=[]
+				print(L_fut)
 				for i in range(len(L_fut)):
 					L_res=await L_fut[i]
+					print(L_res)
 					res.extend(L_res)
-				#print(res)
+				print(res)
 				return res
 			asyncio.run(asy_main())
-			return asy_main
-			#return parallel_normal
+			retVal=asy_main
 		# the mode future, user needs to use asyncio module and await the Future result in main function
 		elif self.mode=="future":
-			#def parallel_future(g):
 			L_fut=future_generator(iterable, self.n_jobs, dict_task)
 			fut=asyncio.gather(*L_fut)
-			if self.end_msg==None:
-				return fut
-			else:
+			if self.end_msg!=None:
 				fut.add_done_callback(functools.partial(print, self.end_msg))
-				return fut
-			#return fut
-			#return parallel_future
+			retVal=fut
+		if hasattr(self._backend, 'stop_call'):
+			self._backend.stop_call()
+		return retVal
 
 def delayed(f):
 	def delayed_func(*args):
@@ -154,13 +203,44 @@ def register_compressor(compressor_name, compressor, force=False):
 	return joblib.register_compressor(compressor_name, compressor, force)
 
 def effective_n_jobs(n_jobs=-1):
-	return joblib.effective_n_jobs(n_jobs)
+	return cpu_count()
 
-class parallel_backend(joblib.parallel_backend):
+def get_active_backend(prefer=None, require=None, verbose=0):
+	return joblib.parallel.get_active_backend(prefer, require, verbose)
+
+class parallel_backend(object):
 	def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
                  **backend_params):
-		super(parallel_backend, self).__init__(backend, n_jobs=-1, inner_max_num_threads=None,
-                 **backend_params)
+		if isinstance(backend, str):
+			backend = BACKENDS[backend](**backend_params)
+
+		current_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
+		if backend.nesting_level is None:
+			if current_backend_and_jobs is None:
+				nesting_level = 0
+			else:
+				nesting_level = current_backend_and_jobs[0].nesting_level
+
+			backend.nesting_level = nesting_level
+
+		# Save the backends info and set the active backend
+		self.old_backend_and_jobs = current_backend_and_jobs
+		self.new_backend_and_jobs = (backend, n_jobs)
+
+		_backend.backend_and_jobs = (backend, n_jobs)
+
+	def __enter__(self):
+		return self.new_backend_and_jobs
+
+	def __exit__(self, type, value, traceback):
+		self.unregister()
+
+	def unregister(self):
+		if self.old_backend_and_jobs is None:
+			if getattr(_backend, 'backend_and_jobs', None) is not None:
+				del _backend.backend_and_jobs
+		else:
+			_backend.backend_and_jobs = self.old_backend_and_jobs
 
-def register_parallel_backend(name, factory, make_default=False):
-	return joblib.register_parallel_backend(name, factory, make_default)
+def register_parallel_backend(name, factory):
+	BACKENDS[name] = factory

+ 17 - 13
starpupy/src/starpu_task_wrapper.c

@@ -21,6 +21,7 @@
 
 #define PY_SSIZE_T_CLEAN
 #include <Python.h>
+#include <numpy/arrayobject.h>
 
 /*macro*/
 #if defined(Py_DEBUG) || defined(DEBUG)
@@ -65,8 +66,7 @@ void codelet_func(void *buffers[], void *cl_arg)
 	}
 
 	/*check the arguments of python function passed in*/
-	int i;
-	for (i=0; i < PyTuple_Size(cst->argList); i++)
+	for (int i=0; i < PyTuple_Size(cst->argList); i++)
 	{
 		PyObject *obj = PyTuple_GetItem(cst->argList, i);
 		const char *tp = Py_TYPE(obj)->tp_name;
@@ -77,10 +77,16 @@ void codelet_func(void *buffers[], void *cl_arg)
 			/*replace the Future argument to its result*/
 			PyTuple_SetItem(cst->argList, i, fut_result);
 		}
+    /*else if (strcmp(tp, "numpy.ndarray")==0)
+    {
+      printf("array is %p\n", obj);
+    }*/
 	}
 
 	/*call the python function*/
 	PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
+  //const char *tp = Py_TYPE(pRetVal)->tp_name;
+  //printf("return value type is %s\n", tp);
 	cst->rv = pRetVal;
 
 	//Py_DECREF(cst->f);
@@ -107,6 +113,7 @@ void cb_func(void *v)
 	Py_DECREF(cst->rv);
 	Py_DECREF(cst->fut);
 	Py_DECREF(cst->lp);
+  Py_DECREF(cst->argList);
 
 	//Py_DECREF(perfmodel);
 	struct starpu_codelet *func_cl=(struct starpu_codelet *) task->cl;
@@ -117,13 +124,6 @@ void cb_func(void *v)
 		Py_DECREF(perfmodel);
 	}
 
-	int i;
-	for(i = 0; i < PyTuple_Size(cst->argList); i++)
-	{
-		Py_DECREF(PyTuple_GetItem(cst->argList, i));
-	}
-	Py_DECREF(cst->argList);
-
 	/*restore previous GIL state*/
 	PyGILState_Release(state);
 
@@ -255,8 +255,7 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 	/*set one of fut attribute to the task pointer*/
 	PyObject_SetAttrString(fut, "starpu_task", PyTask);
 	/*check the arguments of python function passed in*/
-	int i;
-	for (i=1; i < PyTuple_Size(args)-1; i++)
+	for (int i=1; i < PyTuple_Size(args)-1; i++)
 	{
 		PyObject *obj=PyTuple_GetItem(args, i);
 		const char* tp = Py_TYPE(obj)->tp_name;
@@ -304,8 +303,7 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
     else
     {/*function has arguments*/
       cst->argList = PyTuple_New(PyTuple_Size(args)-2);
-      int i;
-      for (i=0; i < PyTuple_Size(args)-2; i++)
+      for (int i=0; i < PyTuple_Size(args)-2; i++)
       {
         PyObject *tmp=PyTuple_GetItem(args, i+1);
         PyTuple_SetItem(cst->argList, i, tmp);
@@ -363,7 +361,9 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
   }
 
 	/*call starpu_task_submit method*/
+  Py_BEGIN_ALLOW_THREADS
 	int ret = starpu_task_submit(task);
+  Py_END_ALLOW_THREADS
 	assert(ret==0);
 	task->callback_func=&cb_func;
 	if (strcmp(tp_perf, "PyCapsule")==0)
@@ -493,6 +493,10 @@ PyInit_starpupy(void)
 	assert(ret==0);
 	/*python asysncio import*/
 	asyncio_module = PyImport_ImportModule("asyncio");
+  #ifdef STARPU_PYTHON_HAVE_NUMPY
+  /*numpy import array*/
+  import_array();
+  #endif
 	/*module import initialization*/
 	return PyModule_Create(&starpupymodule);
 }