Kaynağa Gözat

starpupy: accept keyword arguments when submit a function as task

HE Kun 4 yıl önce
ebeveyn
işleme
fd97d1ef09

+ 32 - 32
starpupy/examples/starpu_py_parallel.py

@@ -115,7 +115,7 @@ with starpu.joblib.parallel_backend("testing") as (ba, n_jobs):
 ###############################################
 
 
-N=10000
+N=100
 # A=np.arange(N)
 # B=np.arange(N)
 # a=np.arange(N)
@@ -130,101 +130,101 @@ N=10000
 print("************************")
 print("parallel Normal version:")
 print("************************")
-print("--input is an iterable argument list, for the function which has one scalar as its argument")
+print("--(sqrt)(i**2)for i in range(N)")
 start_exec1=time.time()
 start_cpu1=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
 end_exec1=time.time()
 end_cpu1=time.process_time()
 print("the program execution time is", end_exec1-start_exec1)
 print("the cpu execution time is", end_cpu1-start_cpu1)
 
-print("--inputs is an iterable argument list, for the function which has multiple scalars as its arguments ")
+print("--(multi)(i,j) for i,j in zip(a,b)")
 a=np.arange(N)
 b=np.arange(N, 2*N, 1)
 start_exec2=time.time()
 start_cpu2=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
 end_exec2=time.time()
 end_cpu2=time.process_time()
 print("the program execution time is", end_exec2-start_exec2)
 print("the cpu execution time is", end_cpu2-start_cpu2)
 
-print("--inputs are iterable argument list and numpy array, for the function which has multiple arrays as its arguments")
+print("--(scal_arr)((i for i in b), A)")
 A=np.arange(N)
 b=np.arange(N, 2*N, 1)
 start_exec3=time.time()
 start_cpu3=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="scal_list")(starpu.joblib.delayed(scal_arr)( (i for i in b), A))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal_list")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
 end_exec3=time.time()
 end_cpu3=time.process_time()
 print("the program execution time is", end_exec3-start_exec3)
 print("the cpu execution time is", end_cpu3-start_cpu3)
 
-print("--input is an iterable argument list, for the function which has one array as its argument")
+print("--(multi_list)((i,j) for i,j in zip(a,b))")
 a=np.arange(N)
 b=np.arange(N, 2*N, 1)
 start_exec4=time.time()
 start_cpu4=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
 end_exec4=time.time()
 end_cpu4=time.process_time()
 print("the program execution time is", end_exec4-start_exec4)
 print("the cpu execution time is", end_cpu4-start_cpu4)
 
-print("--input are multiple iterable argument lists, for the function which has multiple arrays as its arguments")
+print("--(multi_2arr)((i for i in a), (j for j in b))")
 a=np.arange(N)
 b=np.arange(N, 2*N, 1)
 start_exec5=time.time()
 start_cpu5=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
 end_exec5=time.time()
 end_cpu5=time.process_time()
 print("the program execution time is", end_exec5-start_exec5)
 print("the cpu execution time is", end_cpu5-start_cpu5)
 
-print("--input are multiple numpy arrays, for the function which has multiple arrays as its arguments")
+print("--(multi_2arr)(A, B)")
 A=np.arange(N)
 B=np.arange(N, 2*N, 1)
 print("The input arrays are A", A, "B", B)
 start_exec6=time.time()
 start_cpu6=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)(A, B))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)(A, B))
 end_exec6=time.time()
 end_cpu6=time.process_time()
 print("the program execution time is", end_exec6-start_exec6)
 print("the cpu execution time is", end_cpu6-start_cpu6)
 print("The return arrays are A", A, "B", B)
 
-print("--input are scalar and iterable argument list, for the function which has scalar and array as its arguments")
+print("--(scal)(2, t=(j for j in a))")
 a=np.arange(N)
 start_exec7=time.time()
 start_cpu7=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2, (j for j in a)))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2, t=(j for j in a)))
 end_exec7=time.time()
 end_cpu7=time.process_time()
 print("the program execution time is", end_exec7-start_exec7)
 print("the cpu execution time is", end_cpu7-start_cpu7)
 
-print("--input are scalar and numpy array, for the function which has scalar and array as its arguments")
+print("--(scal)(2,A)")
 A=np.arange(N)
 print("The input array is", A)
 start_exec8=time.time()
 start_cpu8=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2,A))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2,A))
 end_exec8=time.time()
 end_cpu8=time.process_time()
 print("the program execution time is", end_exec8-start_exec8)
 print("the cpu execution time is", end_cpu8-start_cpu8)
 print("The return array is", A)
 
-print("--input are scalar and multiple numpy arrays, for the function which has scalar and arrays as its arguments")
+print("--(add_scal)(t1=A,t2=B,a=2)")
 A=np.arange(N)
 B=np.arange(N)
 print("The input arrays are A", A, "B", B)
 start_exec9=time.time()
 start_cpu9=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(2,A,B))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(t1=A,t2=B,a=2))
 end_exec9=time.time()
 end_cpu9=time.process_time()
 print("the program execution time is", end_exec9-start_exec9)
@@ -235,7 +235,7 @@ print("The return arrays are A", A, "B", B)
 print("--input is iterable function list")
 start_exec10=time.time()
 start_cpu10=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="func")(g_func)
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="func")(g_func)
 end_exec10=time.time()
 end_cpu10=time.process_time()
 print("the program execution time is", end_exec10-start_exec10)
@@ -251,62 +251,62 @@ print("************************")
 print("parallel Future version:")
 print("************************")
 async def main():
-	print("--input is an iterable argument list, for the function which has one scalar as its argument")
+	print("--(sqrt)(i**2)for i in range(N)")
 	fut1=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
 	res1=await fut1
 	#print(res1)
 
-	print("--inputs is an iterable argument list, for the function which has multiple scalars as its arguments ")
+	print("--(multi)(i,j) for i,j in zip(a,b)")
 	a=np.arange(N)
 	b=np.arange(N, 2*N, 1)
 	fut2=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
 	res2=await fut2
 	#print(res2)
 
-	print("--inputs are iterable argument list and numpy array, for the function which has multiple arrays as its arguments")
+	print("--(scal_arr)((i for i in b), A)")
 	A=np.arange(N)
 	b=np.arange(N, 2*N, 1)
-	fut3=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="scal_list")(starpu.joblib.delayed(scal_arr)( (i for i in b), A))
+	fut3=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="scal_list")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
 	res3=await fut3
 	#print(res3)
 
-	print("--input is an iterable argument list, for the function which has one array as its argument")
+	print("--(multi_list)((i,j) for i,j in zip(a,b))")
 	a=np.arange(N)
 	b=np.arange(N, 2*N, 1)
 	fut4=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
 	res4=await fut4
 	#print(res4)
 
-	print("--input are multiple iterable argument lists, for the function which has multiple arrays as its arguments")
+	print("--(multi_2arr)((i for i in a), (j for j in b))")
 	a=np.arange(N)
 	b=np.arange(N, 2*N, 1)
 	fut5=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
 	res5=await fut5
 	#print(res5)
 
-	print("--input are multiple numpy arrays, for the function which has multiple arrays as its arguments")
+	print("--(multi_2arr)(b=B, a=A)")
 	A=np.arange(N)
 	B=np.arange(N, 2*N, 1)
 	print("The input arrays are A", A, "B", B)
-	fut6=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)(A, B))
+	fut6=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)(b=B, a=A))
 	res6=await fut6
 	print("The return arrays are A", A, "B", B)
 
 
-	print("--input are scalar and iterable argument list, for the function which has scalar and array as its arguments")
+	print("--(scal)(2, (j for j in a))")
 	a=np.arange(N)
 	fut7=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2, (j for j in a)))
 	res7=await fut7
 	#print(res6)
 
-	print("--input are scalar and numpy array, for the function which has scalar and array as its arguments")
+	print("--(scal)(2,t=A)")
 	A=np.arange(N)
 	print("The input array is", A)
-	fut8=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2,A))
+	fut8=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2,t=A))
 	res8=await fut8
 	print("The return array is", A)
 
-	print("--input are scalar and multiple numpy arrays, for the function which has scalar and arrays as its arguments")
+	print("--(scal)(2,A,B)")
 	A=np.arange(N)
 	B=np.arange(N)
 	print("The input arrays are A", A, "B", B)

+ 1 - 1
starpupy/src/__init__.py

@@ -18,4 +18,4 @@
 from.starpupy import *
 from .delay import *
 #from . import joblib
-from .intermedia import *
+from .intermedia import *

+ 19 - 6
starpupy/src/joblib.py

@@ -27,6 +27,7 @@ import asyncio
 import math
 import functools
 import numpy as np
+import inspect
 import threading
 
 BACKENDS={}
@@ -72,8 +73,17 @@ def future_generator(iterable, n_jobs, dict_task):
 	if type(iterable) is tuple:
 		# the function is always the first element
 		f=iterable[0]
-		# the list of arguments is always the second element
-		args=iterable[1]
+		# get the name of formal arguments of f
+		formal_args=inspect.getargspec(f).args
+		# get the arguments list
+		args=[]
+		# argument is arbitrary in iterable[1]
+		args=list(iterable[1])
+		# argument is keyword argument in iterable[2]
+		for i in range(len(formal_args)):
+			for j in iterable[2].keys():
+				if j==formal_args[i]:
+					args.append(iterable[2][j])
 		# check whether all arrays have the same size
 		l_arr=[]
 		# list of Future result
@@ -159,6 +169,9 @@ class Parallel(object):
                                  % (backend, sorted(BACKENDS.keys()))) from e
 			backend = backend_factory(nesting_level=nesting_level)
 
+		if n_jobs is None:
+			n_jobs = 1
+
 		self.mode=mode
 		self.perfmodel=perfmodel
 		self.end_msg=end_msg
@@ -203,10 +216,10 @@ class Parallel(object):
 			self._backend.stop_call()
 		return retVal
 
-def delayed(f):
-	def delayed_func(*args):
-		return f, args
-	return delayed_func
+def delayed(function):
+	def delayed_function(*args, **kwargs):
+		return function, args, kwargs
+	return delayed_function
 
 
 ######################################################################