Browse Source

starpupy: change the function parallel to class Parallel

(cherry picked from commit cf2b0f3335937b9539b2fe7a8ce4b8218293c775)
HE Kun 4 years ago
parent
commit
58db88a6b9

File diff suppressed because it is too large
+ 17 - 8
doc/doxygen/chapters/400_python.doxy


+ 70 - 20
starpupy/src/starpu/joblib.py

@@ -15,12 +15,15 @@
 #
 from starpu import starpupy
 import starpu
+import joblib
 import asyncio
 import math
 import functools
 
 # get the number of CPUs controlled by StarPU
-n_cpus=starpupy.cpu_worker_get_count()
+def cpu_count():
+	n_cpus=starpupy.cpu_worker_get_count()
+	return n_cpus
 
 # split a list ls into n_block numbers of sub-lists 
 def partition(ls, n_block):
@@ -55,10 +58,10 @@ def future_generator(g, n_jobs, dict_task):
 			L_func.append(f(*L_args))
 		return L_func
 	# get the number of block
-	if n_jobs<-n_cpus-1 or n_jobs>n_cpus:
-		print("Error: n_jobs is out of range, number of CPUs is", n_cpus)
+	if n_jobs<-cpu_count()-1 or n_jobs>cpu_count():
+		print("Error: n_jobs is out of range, number of CPUs is", cpu_count())
 	elif n_jobs<0:
-		n_block=n_cpus+1+n_jobs
+		n_block=cpu_count()+1+n_jobs
 	else:
 		n_block=n_jobs
 	# generate the split function list
@@ -72,18 +75,34 @@ def future_generator(g, n_jobs, dict_task):
 		L_fut.append(fut)
 	return L_fut
 
-def parallel(*, mode="normal", n_jobs=1, perfmodel=None, end_msg=None,\
+class Parallel(joblib.Parallel):
+	def __init__(self, mode="normal", perfmodel=None, end_msg=None,\
 			 name=None, synchronous=0, priority=0, color=None, flops=None,\
-	         backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
+	         n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
 	         batch_size='auto', temp_folder=None, max_nbytes='1M',\
 	         mmap_mode='r', prefer=None, require=None):
-	#generate the dictionary of task_submit
-	dict_task={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': perfmodel}
-	# the mode normal, user can call the function directly without using async
-	if mode=="normal":
-		def parallel_normal(g):
+		super(Parallel, self).__init__(n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
+	         batch_size='auto', temp_folder=None, max_nbytes='1M',\
+	         mmap_mode='r', prefer=None, require=None)
+
+		self.mode=mode
+		self.perfmodel=perfmodel
+		self.end_msg=end_msg
+		self.name=name
+		self.synchronous=synchronous
+		self.priority=priority
+		self.color=color
+		self.flops=flops
+		self.n_jobs=n_jobs
+
+	def __call__(self,iterable):
+		#generate the dictionary of task_submit
+		dict_task={'name': self.name, 'synchronous': self.synchronous, 'priority': self.priority, 'color': self.color, 'flops': self.flops, 'perfmodel': self.perfmodel}
+		# the mode normal, user can call the function directly without using async
+		if self.mode=="normal":
+			#def parallel_normal(g):
 			async def asy_main():
-				L_fut=future_generator(g, n_jobs, dict_task)
+				L_fut=future_generator(iterable, self.n_jobs, dict_task)
 				res=[]
 				for i in range(len(L_fut)):
 					L_res=await L_fut[i]
@@ -92,19 +111,19 @@ def parallel(*, mode="normal", n_jobs=1, perfmodel=None, end_msg=None,\
 				return res
 			asyncio.run(asy_main())
 			return asy_main
-		return parallel_normal
-	# the mode future, user needs to use asyncio module and await the Future result in main function
-	elif mode=="future":
-		def parallel_future(g):
-			L_fut=future_generator(g, n_jobs, dict_task)
+			#return parallel_normal
+		# the mode future, user needs to use asyncio module and await the Future result in main function
+		elif self.mode=="future":
+			#def parallel_future(g):
+			L_fut=future_generator(iterable, self.n_jobs, dict_task)
 			fut=asyncio.gather(*L_fut)
-			if end_msg==None:
+			if self.end_msg==None:
 				return fut
 			else:
-				fut.add_done_callback(functools.partial(print, end_msg))
+				fut.add_done_callback(functools.partial(print, self.end_msg))
 				return fut
 			#return fut
-		return parallel_future
+			#return parallel_future
 
 def delayed(f):
 	def delayed_func(*args):
@@ -113,4 +132,35 @@ def delayed(f):
 
 
 ######################################################################
+class Memory(joblib.Memory):
+	def __init__(self,location=None, backend='local', cachedir=None,
+                 mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
+                 backend_options=None):
+		super(Memory, self).__init__(location=None, backend='local', cachedir=None,
+                 mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
+                 backend_options=None)
+
+
+def dump(value, filename, compress=0, protocol=None, cache_size=None):
+	return joblib.dump(value, filename, compress, protocol, cache_size)
+
+def load(filename, mmap_mode=None):
+	return joblib.load(filename, mmap_mode)
+
+def hash(obj, hash_name='md5', coerce_mmap=False):
+	return joblib.hash(obj, hash_name, coerce_mmap)
+
+def register_compressor(compressor_name, compressor, force=False):
+	return joblib.register_compressor(compressor_name, compressor, force)
+
+def effective_n_jobs(n_jobs=-1):
+	return joblib.effective_n_jobs(n_jobs)
+
+class parallel_backend(joblib.parallel_backend):
+	def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
+                 **backend_params):
+		super(parallel_backend, self).__init__(backend, n_jobs=-1, inner_max_num_threads=None,
+                 **backend_params)
 
+def register_parallel_backend(name, factory, make_default=False):
+	return joblib.register_parallel_backend(name, factory, make_default)

+ 6 - 6
starpupy/tests/starpu_py_parallel.py

@@ -67,13 +67,13 @@ print("************************")
 print("parallel Normal version:")
 print("************************")
 print("--input is iterable argument list, example 1")
-starpu.joblib.parallel(mode="normal", n_jobs=-2, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
+starpu.joblib.Parallel(mode="normal", n_jobs=-2, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
 
 print("--input is iterable argument list, example 2")
-starpu.joblib.parallel(mode="normal", n_jobs=2, perfmodel="second")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
+starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="second")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
 
 print("--input is iterable function list")
-starpu.joblib.parallel(mode="normal", n_jobs=3, perfmodel="third")(g_func)
+starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="third")(g_func)
 
 
 print("************************")
@@ -81,17 +81,17 @@ print("parallel Future version:")
 print("************************")
 async def main():
 	print("--input is iterable argument list, example 1")
-	fut1=starpu.joblib.parallel(mode="future", n_jobs=-3, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
+	fut1=starpu.joblib.Parallel(mode="future", n_jobs=-3, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
 	res1=await fut1
 	#print(res1)
 
 	print("--input is iterable argument list, example 2")
-	fut2=starpu.joblib.parallel(mode="future", n_jobs=-3, perfmodel="second")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
+	fut2=starpu.joblib.Parallel(mode="future", n_jobs=-3, perfmodel="second")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
 	res2=await fut2
 	#print(res2)
 
 	print("--input is iterable function list")
-	fut3=starpu.joblib.parallel(mode="future", n_jobs=2, perfmodel="third")(g_func)
+	fut3=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="third")(g_func)
 	res3=await fut3
 	#print(res3)
 asyncio.run(main())