Browse Source

starpupy: add the case that argument of starpu.joblib.delayed is tuple format, and examples for the function which has multiple arrays as its arguments

HE Kun 4 years ago
parent
commit
9645b43e35

+ 40 - 0
doc/doxygen/chapters/400_python.doxy

@@ -252,6 +252,46 @@ Example 3: The result of  1.2 + 2.5 + 3.6 + 4.9 is 12.200000000000001
 Example 4: The result of  6 - 2 - 5.9 is -1.9000000000000004 and the result of 6 - 2 is 4
 \endverbatim
 
+When you want to apply parallel computing for a function which contains arrays, for example:
+
+\code{.py}
+>>> def multi_array(a, b):
+...     for i in range(len(a)):
+...         a[i]=a[i]*b[i]
+...     return a
+\endcode
+
+You should provide either a Numpy array or a generator for the argument of starpu.joblib.delayed.
+
+\code{.py}
+>>> import numpy as np
+>>> A=np.arange(10)
+>>> B=np.arange(10, 20, 1)
+>>> starpu.joblib.Parallel(mode="normal", n_jobs=2)(starpu.joblib.delayed(multi_array)((i for i in A), (j for j in B)))
+[0, 11, 24, 39, 56, 75, 96, 119, 144, 171]
+>>> starpu.joblib.Parallel(mode="normal", n_jobs=2)(starpu.joblib.delayed(multi_array)(A, B))
+[0, 11, 24, 39, 56, 75, 96, 119, 144, 171]
+>>> starpu.joblib.Parallel(mode="normal", n_jobs=2)(starpu.joblib.delayed(multi_array)(A, (j for j in B)))
+[0, 11, 24, 39, 56, 75, 96, 119, 144, 171]
+\endcode
+
+The above three writing methods are equivalent and their execution time are very close. 
+
+Of course, you can also provide a scalar, but not with the generator expression, for example:
+
+\code{.py}
+>>> import numpy as np
+>>> def scal(a, t):
+... 	for i in range(len(t)):
+... 		t[i]=t[i]*a
+... 	return t
+>>> A=np.arange(10)
+>>> starpu.joblib.Parallel(mode="normal", n_jobs=2)(starpu.joblib.delayed(scal)(2,A))
+[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
+>>> starpu.joblib.Parallel(mode="normal", n_jobs=2)(starpu.joblib.delayed(scal)(2, (i for i in A)))
+[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
+\endcode
+
 \subsection ParallelParameters Parallel Parameters
 
 Without setting options of function task_submit, starpu.joblib.Parallel also provides some own parameters:

+ 215 - 48
starpupy/examples/starpu_py_parallel.py

@@ -14,6 +14,7 @@
 # See the GNU Lesser General Public License in COPYING.LGPL for more details.
 #
 import starpu
+import starpu.joblib
 import time
 import asyncio
 from math import sqrt
@@ -40,11 +41,11 @@ def func2():
 g_func.append(starpu.joblib.delayed(func2)())
  
 #function has 2 int inputs and 1 int output
-def multi(a,b):
-	res_multi=a*b
-	print("Example 4: The result of ",a,"*",b,"is",res_multi)
-	return res_multi
-g_func.append(starpu.joblib.delayed(multi)(2, 3))
+def exp(a,b):
+	res_exp=a**b
+	print("Example 4: The result of ",a,"^",b,"is",res_exp)
+	return res_exp
+g_func.append(starpu.joblib.delayed(exp)(2, 3))
 
 #function has 4 float inputs and 1 float output
 def add(a,b,c,d):
@@ -61,30 +62,39 @@ def sub(a,b,c):
 	return res_sub1, res_sub2
 g_func.append(starpu.joblib.delayed(sub)(6, 2, 5.9))
 
-#the size of generator
-N=100
-# a=np.array([1,2,3,4,5,6,7,8,9,10])
-# print(type(a))
+##########functions of array calculation###############
 
 def scal(a, t):
 	for i in range(len(t)):
 		t[i]=t[i]*a
 	return t
 
-A=np.arange(N)
+def add_scal(a, t1, t2):
+	for i in range(len(t1)):
+		t1[i]=t1[i]*a+t2[i]
+	return t1
 
+def scal_arr(a, t):
+    for i in range(len(t)):
+        t[i]=t[i]*a[i]
+    return t
 
-#starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="log")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
-# for x in [10, 100, 1000, 10000, 100000, 1000000]:
-# 	for X2 in range(x, x*10, x):
-# 		starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="log")(starpu.joblib.delayed(log10)(i+1)for i in range(X2))
-# 		print(range(X2))
+def multi(a,b):
+	res_multi=a*b
+	return res_multi
 
-print("************************")
-print("parallel Normal version:")
-print("************************")
-print("--input is iterable argument list, example 1")
-starpu.joblib.Parallel(mode="normal", n_jobs=-2, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
+def multi_2arr(a, b):
+    for i in range(len(a)):
+        a[i]=a[i]*b[i]
+    return a
+
+def multi_list(l):
+	res = []
+	for (a,b) in l:
+		res.append(a*b)
+	return res
+
+########################################################
 
 #################scikit test###################
 DEFAULT_JOBLIB_BACKEND = starpu.joblib.get_active_backend()[0].__class__
@@ -104,56 +114,213 @@ with starpu.joblib.parallel_backend("testing") as (ba, n_jobs):
 	print("backend and n_jobs is", ba, n_jobs)
 ###############################################
 
-print("--input is iterable argument list, example 2, with multi input")
-a=np.arange(10)
-b=np.arange(10)
-starpu.joblib.Parallel(mode="normal", n_jobs=-2, perfmodel="first")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
 
-print("--input is iterable function list")
-p=starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="third")
-p(g_func)
+N=10000
+# A=np.arange(N)
+# B=np.arange(N)
+# a=np.arange(N)
+# b=np.arange(N, 2*N, 1)
 
-def producer():
-	for i in range(6):
-		print('Produced %s' % i)
-		yield i
-#starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(sqrt)(i) for i in producer())
+#starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="log")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
+# for x in [10, 100, 1000, 10000, 100000, 1000000]:
+# 	for X2 in range(x, x*10, x):
+# 		starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="log")(starpu.joblib.delayed(log10)(i+1)for i in range(X2))
+# 		print(range(X2))
+
+print("************************")
+print("parallel Normal version:")
+print("************************")
+print("--input is an iterable argument list, for the function which has one scalar as its argument")
+start_exec1=time.time()
+start_cpu1=time.process_time()
+starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
+end_exec1=time.time()
+end_cpu1=time.process_time()
+print("the program execution time is", end_exec1-start_exec1)
+print("the cpu execution time is", end_cpu1-start_cpu1)
+
+print("--inputs is an iterable argument list, for the function which has multiple scalars as its arguments ")
+a=np.arange(N)
+b=np.arange(N, 2*N, 1)
+start_exec2=time.time()
+start_cpu2=time.process_time()
+starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
+end_exec2=time.time()
+end_cpu2=time.process_time()
+print("the program execution time is", end_exec2-start_exec2)
+print("the cpu execution time is", end_cpu2-start_cpu2)
+
+print("--inputs are iterable argument list and numpy array, for the function which has multiple arrays as its arguments")
+A=np.arange(N)
+b=np.arange(N, 2*N, 1)
+start_exec3=time.time()
+start_cpu3=time.process_time()
+starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="scal_list")(starpu.joblib.delayed(scal_arr)( (i for i in b), A))
+end_exec3=time.time()
+end_cpu3=time.process_time()
+print("the program execution time is", end_exec3-start_exec3)
+print("the cpu execution time is", end_cpu3-start_cpu3)
+
+print("--input is an iterable argument list, for the function which has one array as its argument")
+a=np.arange(N)
+b=np.arange(N, 2*N, 1)
+start_exec4=time.time()
+start_cpu4=time.process_time()
+starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
+end_exec4=time.time()
+end_cpu4=time.process_time()
+print("the program execution time is", end_exec4-start_exec4)
+print("the cpu execution time is", end_cpu4-start_cpu4)
+
+print("--input are multiple iterable argument lists, for the function which has multiple arrays as its arguments")
+a=np.arange(N)
+b=np.arange(N, 2*N, 1)
+start_exec5=time.time()
+start_cpu5=time.process_time()
+starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
+end_exec5=time.time()
+end_cpu5=time.process_time()
+print("the program execution time is", end_exec5-start_exec5)
+print("the cpu execution time is", end_cpu5-start_cpu5)
+
+print("--input are multiple numpy arrays, for the function which has multiple arrays as its arguments")
+A=np.arange(N)
+B=np.arange(N, 2*N, 1)
+print("The input arrays are A", A, "B", B)
+start_exec6=time.time()
+start_cpu6=time.process_time()
+starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)(A, B))
+end_exec6=time.time()
+end_cpu6=time.process_time()
+print("the program execution time is", end_exec6-start_exec6)
+print("the cpu execution time is", end_cpu6-start_cpu6)
+print("The return arrays are A", A, "B", B)
 
+print("--input are scalar and iterable argument list, for the function which has scalar and array as its arguments")
+a=np.arange(N)
+start_exec7=time.time()
+start_cpu7=time.process_time()
+starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2, (j for j in a)))
+end_exec7=time.time()
+end_cpu7=time.process_time()
+print("the program execution time is", end_exec7-start_exec7)
+print("the cpu execution time is", end_cpu7-start_cpu7)
 
-print("--input is numpy array")
+print("--input are scalar and numpy array, for the function which has scalar and array as its arguments")
+A=np.arange(N)
 print("The input array is", A)
-starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="array")(starpu.joblib.delayed(scal)(2,A))
+start_exec8=time.time()
+start_cpu8=time.process_time()
+starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2,A))
+end_exec8=time.time()
+end_cpu8=time.process_time()
+print("the program execution time is", end_exec8-start_exec8)
+print("the cpu execution time is", end_cpu8-start_cpu8)
 print("The return array is", A)
 
+print("--input are scalar and multiple numpy arrays, for the function which has scalar and arrays as its arguments")
+A=np.arange(N)
+B=np.arange(N)
+print("The input arrays are A", A, "B", B)
+start_exec9=time.time()
+start_cpu9=time.process_time()
+starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(2,A,B))
+end_exec9=time.time()
+end_cpu9=time.process_time()
+print("the program execution time is", end_exec9-start_exec9)
+print("the cpu execution time is", end_cpu9-start_cpu9)
+print("The return arrays are A", A, "B", B)
+
+
+print("--input is iterable function list")
+start_exec10=time.time()
+start_cpu10=time.process_time()
+starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="func")(g_func)
+end_exec10=time.time()
+end_cpu10=time.process_time()
+print("the program execution time is", end_exec10-start_exec10)
+print("the cpu execution time is", end_cpu10-start_cpu10)
+
+# def producer():
+# 	for i in range(6):
+# 		print('Produced %s' % i)
+# 		yield i
+#starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(sqrt)(i) for i in producer())
+
 print("************************")
 print("parallel Future version:")
 print("************************")
 async def main():
-	print("--input is iterable argument list, example 1")
-	fut1=starpu.joblib.Parallel(mode="future", n_jobs=-3, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
+	print("--input is an iterable argument list, for the function which has one scalar as its argument")
+	fut1=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
 	res1=await fut1
 	#print(res1)
 
-	print("--input is iterable argument list, example 2, with multi input")
-	a=np.arange(10)
-	b=np.arange(10)
-	fut2=starpu.joblib.Parallel(mode="future", n_jobs=-3, perfmodel="second")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
+	print("--inputs is an iterable argument list, for the function which has multiple scalars as its arguments ")
+	a=np.arange(N)
+	b=np.arange(N, 2*N, 1)
+	fut2=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
 	res2=await fut2
 	#print(res2)
 
-	print("--input is iterable function list")
-	fut3=starpu.joblib.Parallel(mode="future", n_jobs=2)(g_func)
+	print("--inputs are iterable argument list and numpy array, for the function which has multiple arrays as its arguments")
+	A=np.arange(N)
+	b=np.arange(N, 2*N, 1)
+	fut3=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="scal_list")(starpu.joblib.delayed(scal_arr)( (i for i in b), A))
 	res3=await fut3
 	#print(res3)
 
-	print("--input is numpy array")
-	print("The input array is", A)
-	fut4=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="array")(starpu.joblib.delayed(scal)(2,A))
+	print("--input is an iterable argument list, for the function which has one array as its argument")
+	a=np.arange(N)
+	b=np.arange(N, 2*N, 1)
+	fut4=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
 	res4=await fut4
+	#print(res4)
+
+	print("--input are multiple iterable argument lists, for the function which has multiple arrays as its arguments")
+	a=np.arange(N)
+	b=np.arange(N, 2*N, 1)
+	fut5=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
+	res5=await fut5
+	#print(res5)
+
+	print("--input are multiple numpy arrays, for the function which has multiple arrays as its arguments")
+	A=np.arange(N)
+	B=np.arange(N, 2*N, 1)
+	print("The input arrays are A", A, "B", B)
+	fut6=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)(A, B))
+	res6=await fut6
+	print("The return arrays are A", A, "B", B)
+
+
+	print("--input are scalar and iterable argument list, for the function which has scalar and array as its arguments")
+	a=np.arange(N)
+	fut7=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2, (j for j in a)))
+	res7=await fut7
+	#print(res6)
+
+	print("--input are scalar and numpy array, for the function which has scalar and array as its arguments")
+	A=np.arange(N)
+	print("The input array is", A)
+	fut8=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2,A))
+	res8=await fut8
 	print("The return array is", A)
 
+	print("--input are scalar and multiple numpy arrays, for the function which has scalar and arrays as its arguments")
+	A=np.arange(N)
+	B=np.arange(N)
+	print("The input arrays are A", A, "B", B)
+	fut9=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(2,A,B))
+	res9=await fut9
+	print("The return arrays are A", A, "B", B)
+
+	print("--input is iterable function list")
+	fut10=starpu.joblib.Parallel(mode="future", n_jobs=2)(g_func)
+	res10=await fut10
+	#print(res9)
+
 asyncio.run(main())
 
-starpu.perfmodel_plot(perfmodel="first",view=False)
-starpu.perfmodel_plot(perfmodel="second",view=False)
-starpu.perfmodel_plot(perfmodel="third",view=False)
+starpu.perfmodel_plot(perfmodel="sqrt",view=False)
+starpu.perfmodel_plot(perfmodel="multi",view=False)
+starpu.perfmodel_plot(perfmodel="func",view=False)

+ 1 - 1
starpupy/src/__init__.py

@@ -17,5 +17,5 @@
 
 from.starpupy import *
 from .delay import *
-from . import joblib
+#from . import joblib
 from .intermedia import *

+ 46 - 19
starpupy/src/joblib.py

@@ -13,14 +13,16 @@
 #
 # See the GNU Lesser General Public License in COPYING.LGPL for more details.
 #
-from starpu import starpupy
-import starpu
+
 import sys
 try:
    sys.path.remove('/usr/local/lib/python3.8/site-packages/starpu')
 except:
    pass
-import joblib
+import types
+import joblib as jl
+from starpu import starpupy
+import starpu
 import asyncio
 import math
 import functools
@@ -57,35 +59,60 @@ def future_generator(iterable, n_jobs, dict_task):
 	# iterable is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
 	#print("iterable type is ", type(iterable))
 	#print("iterable is", iterable)
-
 	# get the number of block
 	if n_jobs<-cpu_count()-1 or n_jobs>cpu_count():
-		print("Error: n_jobs is out of range, number of CPUs is", cpu_count())
+		raise SystemExit('Error: n_jobs is out of range')
+		#print("Error: n_jobs is out of range, number of CPUs is", cpu_count())
 	elif n_jobs<0:
 		n_block=cpu_count()+1+n_jobs
 	else:
 		n_block=n_jobs
 
-	# if arguments are not passed by a generator
+	# if arguments is tuple format
 	if type(iterable) is tuple:
+		# the function is always the first element
+		f=iterable[0]
+		# the list of arguments is always the second element
+		args=iterable[1]
+		# check whether all arrays have the same size
+		l_arr=[]
+		# list of Future result
 		L_fut=[]
+		# split the vector
+		args_split=[]
+		for i in range(len(args)):
+			args_split.append([])
+			# if the array is an numpy array
+			if type(args[i]) is np.ndarray:
+				# split numpy array
+				args_split[i]=np.array_split(args[i],n_block)
+				# get the length of numpy array
+				l_arr.append(args[i].size)
+			# if the array is a generator
+			elif isinstance(args[i],types.GeneratorType):
+				# split generator
+				args_split[i]=partition(list(args[i]),n_block)
+				# get the length of generator
+				l_arr.append(sum(len(args_split[i][j]) for j in range(len(args_split[i]))))
+		if len(set(l_arr))>1:
+			raise SystemExit('Error: all arrays should have the same size')
+		#print("args list is", args_split)
 		for i in range(n_block):
+			# generate the argument list
 			L_args=[]
-			for j in range(len(iterable[1])):
-				if type(iterable[1][j]) is np.ndarray:
-					arr_split=np.array_split(iterable[1][j],n_block)
-					L_args.append(arr_split[i])
+			for j in range(len(args)):
+				if type(args[j]) is np.ndarray or isinstance(args[j],types.GeneratorType):
+					L_args.append(args_split[j][i])
 				else:
-					L_args.append(iterable[1][j])
+					L_args.append(args[j])
 			#print("L_args is", L_args)
 			fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
 								   color=dict_task['color'], flops=dict_task['flops'])\
-				                  (iterable[0], *L_args)
+				                  (f, *L_args)
 			L_fut.append(fut)
 		return L_fut
 
 	# if iterable is a generator or a list of function
-	#elif isinstance(iterable,generator):
 	else:
 		L=list(iterable)
 		#print(L)
@@ -183,7 +210,7 @@ def delayed(f):
 
 
 ######################################################################
-class Memory(joblib.Memory):
+class Memory(jl.Memory):
 	def __init__(self,location=None, backend='local', cachedir=None,
                  mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
                  backend_options=None):
@@ -193,22 +220,22 @@ class Memory(joblib.Memory):
 
 
 def dump(value, filename, compress=0, protocol=None, cache_size=None):
-	return joblib.dump(value, filename, compress, protocol, cache_size)
+	return jl.dump(value, filename, compress, protocol, cache_size)
 
 def load(filename, mmap_mode=None):
-	return joblib.load(filename, mmap_mode)
+	return jl.load(filename, mmap_mode)
 
 def hash(obj, hash_name='md5', coerce_mmap=False):
-	return joblib.hash(obj, hash_name, coerce_mmap)
+	return jl.hash(obj, hash_name, coerce_mmap)
 
 def register_compressor(compressor_name, compressor, force=False):
-	return joblib.register_compressor(compressor_name, compressor, force)
+	return jl.register_compressor(compressor_name, compressor, force)
 
 def effective_n_jobs(n_jobs=-1):
 	return cpu_count()
 
 def get_active_backend(prefer=None, require=None, verbose=0):
-	return joblib.parallel.get_active_backend(prefer, require, verbose)
+	return jl.parallel.get_active_backend(prefer, require, verbose)
 
 class parallel_backend(object):
 	def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,