浏览代码

starpupy: add end_msg option in joblib.parallel, and add perfmodel_plot function

HE Kun 4 年之前
父节点
当前提交
d3c40b6a90
共有 2 个文件被更改,包括 37 次插入23 次删除
  1. 26 13
      starpupy/src/starpu/joblib.py
  2. 11 10
      starpupy/tests/starpu_py_parallel.py

+ 26 - 13
starpupy/src/starpu/joblib.py

@@ -65,7 +65,6 @@ def dict_perf_generator(perfsymbol):
 	return p
 
 def future_generator(g, n_jobs, perfsymbol):
-	p=dict_perf_generator(perfsymbol)
 	# g is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
 	L=list(g)
 	# generate a list of function according to g
@@ -80,7 +79,9 @@ def future_generator(g, n_jobs, perfsymbol):
 			L_func.append(f(*L_args))
 		return L_func
 	# get the number of block
-	if n_jobs<0:
+	if n_jobs<-n_cpus-1 or n_jobs>n_cpus:
+		print("Error: n_jobs is out of range, number of CPUs is", n_cpus)
+	elif n_jobs<0:
 		n_block=n_cpus+1+n_jobs
 	else:
 		n_block=n_jobs
@@ -89,11 +90,16 @@ def future_generator(g, n_jobs, perfsymbol):
 	# operation in each split list
 	L_fut=[]
 	for i in range(len(L_split)):
-		fut=starpupy.task_submit(lf, L_split[i], p.get_struct())
-		L_fut.append(fut)
+		if perfsymbol==None:
+			fut=starpupy.task_submit(lf, L_split[i])
+			L_fut.append(fut)
+		else:
+			p=dict_perf_generator(perfsymbol)
+			fut=starpupy.task_submit(lf, L_split[i], p.get_struct())
+			L_fut.append(fut)
 	return L_fut
 
-def parallel(*, mode, n_jobs=None, perfmodel, \
+def parallel(*, mode="normal", n_jobs=1, perfmodel=None, end_msg=None,\
 	         backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
 	         batch_size='auto', temp_folder=None, max_nbytes='1M',\
 	         mmap_mode='r', prefer=None, require=None):
@@ -106,8 +112,7 @@ def parallel(*, mode, n_jobs=None, perfmodel, \
 				for i in range(len(L_fut)):
 					L_res=await L_fut[i]
 					res.extend(L_res)
-				print(res)
-				#p.free_struct()
+				#print(res)
 				return res
 			asyncio.run(asy_main())
 			return asy_main
@@ -117,7 +122,16 @@ def parallel(*, mode, n_jobs=None, perfmodel, \
 		def parallel_future(g):
 			L_fut=future_generator(g, n_jobs, perfmodel)
 			fut=asyncio.gather(*L_fut)
-			return fut
+			if end_msg==None:
+				return fut
+			else:
+				loop = asyncio.get_running_loop()
+				async def await_fut():
+					retval=await fut
+					print(end_msg)
+					return retval	
+				return loop.run_in_executor(None, await_fut)
+			#return fut
 		return parallel_future
 
 def delayed(f):
@@ -128,10 +142,9 @@ def delayed(f):
 
 ######################################################################
 # dump performance model
-def dump_perfmodel(perfmodel):
+def perfmodel_plot(perfmodel):
 	p=dict_perf[perfmodel]
 	starpupy.save_history_based_model(p.get_struct())
-
-# dump function
-def dump(value, filename, compress=0, protocol=None, cache_size=None):
-	pickle.dump(value, filename, protocol)
+	os.system('starpu_perfmodel_plot -s "' + perfmodel +'"')
+	os.system('gnuplot starpu_'+perfmodel+'.gp')
+	os.system('gv starpu_'+perfmodel+'.eps')

+ 11 - 10
starpupy/tests/starpu_py_parallel.py

@@ -36,7 +36,7 @@ g_func.append(starpu.joblib.delayed(func1)())
 def func2():
 	print ("Example 3:")
 	return 12
-g_func.append(starpu.joblib.delayed(func2)())	
+g_func.append(starpu.joblib.delayed(func2)())
  
 #function has 2 int inputs and 1 int output
 def multi(a,b):
@@ -61,7 +61,8 @@ def sub(a,b,c):
 g_func.append(starpu.joblib.delayed(sub)(6, 2, 5.9))
 
 #the size of generator
-N=1000
+N=1000000
+
 print("************************")
 print("parallel Normal version:")
 print("************************")
@@ -69,7 +70,7 @@ print("--input is iterable argument list, example 1")
 starpu.joblib.parallel(mode="normal", n_jobs=-2, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
 
 print("--input is iterable argument list, example 2")
-starpu.joblib.parallel(mode="normal", n_jobs=-2, perfmodel="second")(starpu.joblib.delayed(log10)(10**i)for i in range(N))
+starpu.joblib.parallel(mode="normal", n_jobs=2, perfmodel="second")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
 
 print("--input is iterable function list")
 starpu.joblib.parallel(mode="normal", n_jobs=3, perfmodel="third")(g_func)
@@ -82,19 +83,19 @@ async def main():
 	print("--input is iterable argument list, example 1")
 	fut1=starpu.joblib.parallel(mode="future", n_jobs=-3, perfmodel="first")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
 	res1=await fut1
-	print(res1)
+	#print(res1)
 
 	print("--input is iterable argument list, example 2")
-	fut2=starpu.joblib.parallel(mode="future", n_jobs=-3, perfmodel="second")(starpu.joblib.delayed(log10)(10**i)for i in range(N))
+	fut2=starpu.joblib.parallel(mode="future", n_jobs=-3, perfmodel="second")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
 	res2=await fut2
-	print(res2)
+	#print(res2)
 
 	print("--input is iterable function list")
 	fut3=starpu.joblib.parallel(mode="future", n_jobs=2, perfmodel="third")(g_func)
 	res3=await fut3
-	print(res3)
+	#print(res3)
 asyncio.run(main())
 
-starpu.joblib.dump_perfmodel(perfmodel="first")
-starpu.joblib.dump_perfmodel(perfmodel="second")
-starpu.joblib.dump_perfmodel(perfmodel="third")
+starpu.joblib.perfmodel_plot(perfmodel="first")
+starpu.joblib.perfmodel_plot(perfmodel="second")
+starpu.joblib.perfmodel_plot(perfmodel="third")