Ver código fonte

starpupy: add examples of numpy array

HE Kun 4 anos atrás
pai
commit
396370c40a

+ 5 - 1
starpupy/examples/starpu_py.py

@@ -93,7 +93,9 @@ def sub_deco(x,a):
 ###############################################################################
 
 def scal(a, t):
-    return t*a
+	for i in range(len(t)):
+		t[i]=t[i]*a
+	return t
 
 t=np.array([1,2,3,4,5,6,7,8,9,10])
 
@@ -144,6 +146,8 @@ async def main():
     fut8 = starpu.task_submit()(scal, 2, t)
     res8 = await fut8
     print("The result of Example 10 is", res8)
+    print("The return array is", t)
+    #print("The result type is", type(res8))
 
 asyncio.run(main())
 

+ 16 - 3
starpupy/examples/starpu_py_parallel.py

@@ -67,7 +67,9 @@ N=100
 # print(type(a))
 
 def scal(a, t):
-    return t*a
+	for i in range(len(t)):
+		t[i]=t[i]*a
+	return t
 
 A=np.arange(N)
 
@@ -108,10 +110,20 @@ b=np.arange(10)
 starpu.joblib.Parallel(mode="normal", n_jobs=-2, perfmodel="first")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
 
 print("--input is iterable function list")
-starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="third")(g_func)
+p=starpu.joblib.Parallel(mode="normal", n_jobs=3, perfmodel="third")
+p(g_func)
+
+def producer():
+	for i in range(6):
+		print('Produced %s' % i)
+		yield i
+#starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(sqrt)(i) for i in producer())
+
 
 print("--input is numpy array")
+print("The input array is", A)
 starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="array")(starpu.joblib.delayed(scal)(2,A))
+print("The return array is", A)
 
 print("************************")
 print("parallel Future version:")
@@ -135,9 +147,10 @@ async def main():
 	#print(res3)
 
 	print("--input is numpy array")
+	print("The input array is", A)
 	fut4=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="array")(starpu.joblib.delayed(scal)(2,A))
 	res4=await fut4
-	#print(res4)
+	print("The return array is", A)
 
 asyncio.run(main())
 

+ 10 - 8
starpupy/src/joblib.py

@@ -55,8 +55,8 @@ def partition(ls, n_block):
 
 def future_generator(iterable, n_jobs, dict_task):
 	# iterable is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
-	print("iterable type is ", type(iterable))
-	print("iterable is", iterable)
+	#print("iterable type is ", type(iterable))
+	#print("iterable is", iterable)
 
 	# get the number of block
 	if n_jobs<-cpu_count()-1 or n_jobs>cpu_count():
@@ -77,8 +77,10 @@ def future_generator(iterable, n_jobs, dict_task):
 					L_args.append(arr_split[i])
 				else:
 					L_args.append(iterable[1][j])
-			print("L_args is", L_args)
-			fut=starpu.task_submit()(iterable[0], *L_args)
+			#print("L_args is", L_args)
+			fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
+								   color=dict_task['color'], flops=dict_task['flops'])\
+				                  (iterable[0], *L_args)
 			L_fut.append(fut)
 		return L_fut
 
@@ -142,7 +144,8 @@ class Parallel(object):
 		self._backend=backend
 
 	def print_progress(self):
-		pass
+		#pass
+		print("", starpupy.task_nsubmitted())
 
 	def __call__(self,iterable):
 		#generate the dictionary of task_submit
@@ -154,12 +157,11 @@ class Parallel(object):
 			async def asy_main():
 				L_fut=future_generator(iterable, self.n_jobs, dict_task)
 				res=[]
-				print(L_fut)
 				for i in range(len(L_fut)):
 					L_res=await L_fut[i]
-					print(L_res)
 					res.extend(L_res)
-				print(res)
+				#print(res)
+				#print("type of result is", type(res))
 				return res
 			asyncio.run(asy_main())
 			retVal=asy_main