starpu_py_parallel.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. # StarPU --- Runtime system for heterogeneous multicore architectures.
  2. #
  3. # Copyright (C) 2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. #
  5. # StarPU is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU Lesser General Public License as published by
  7. # the Free Software Foundation; either version 2.1 of the License, or (at
  8. # your option) any later version.
  9. #
  10. # StarPU is distributed in the hope that it will be useful, but
  11. # WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. #
  14. # See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. #
  16. import starpu
  17. import starpu.joblib
  18. from starpu import starpupy
  19. import time
  20. import asyncio
  21. from math import sqrt
  22. from math import log10
  23. import numpy as np
  24. import sys
  25. #generate a list to store functions
  26. g_func=[]
  27. #function no input no output print hello world
  28. def hello():
  29. print ("Example 1: Hello, world!")
  30. g_func.append(starpu.joblib.delayed(hello)())
  31. #function no input no output
  32. def func1():
  33. print ("Example 2: This is a function no input no output")
  34. g_func.append(starpu.joblib.delayed(func1)())
  35. #function no input return a value
  36. def func2():
  37. print ("Example 3:")
  38. return 12
  39. g_func.append(starpu.joblib.delayed(func2)())
  40. #function has 2 int inputs and 1 int output
  41. def exp(a,b):
  42. res_exp=a**b
  43. print("Example 4: The result of ",a,"^",b,"is",res_exp)
  44. return res_exp
  45. g_func.append(starpu.joblib.delayed(exp)(2, 3))
  46. #function has 4 float inputs and 1 float output
  47. def add(a,b,c,d):
  48. res_add=a+b+c+d
  49. print("Example 5: The result of ",a,"+",b,"+",c,"+",d,"is",res_add)
  50. return res_add
  51. g_func.append(starpu.joblib.delayed(add)(1.2, 2.5, 3.6, 4.9))
  52. #function has 2 int inputs 1 float input and 1 float output 1 int output
  53. def sub(a,b,c):
  54. res_sub1=a-b-c
  55. res_sub2=a-b
  56. print ("Example 6: The result of ",a,"-",b,"-",c,"is",res_sub1,"and the result of",a,"-",b,"is",res_sub2)
  57. return res_sub1, res_sub2
  58. g_func.append(starpu.joblib.delayed(sub)(6, 2, 5.9))
  59. ##########functions of array calculation###############
  60. def scal(a, t):
  61. for i in range(len(t)):
  62. t[i]=t[i]*a
  63. return t
  64. def add_scal(a, t1, t2):
  65. for i in range(len(t1)):
  66. t1[i]=t1[i]*a+t2[i]
  67. return t1
  68. def scal_arr(a, t):
  69. for i in range(len(t)):
  70. t[i]=t[i]*a[i]
  71. return t
  72. def multi(a,b):
  73. res_multi=a*b
  74. return res_multi
  75. def multi_2arr(a, b):
  76. for i in range(len(a)):
  77. a[i]=a[i]*b[i]
  78. return a
  79. def multi_list(l):
  80. res = []
  81. for (a,b) in l:
  82. res.append(a*b)
  83. return res
  84. def log10_arr(t):
  85. for i in range(len(t)):
  86. t[i]=log10(t[i])
  87. return t
  88. ########################################################
  89. #################scikit test###################
  90. # DEFAULT_JOBLIB_BACKEND = starpu.joblib.get_active_backend()[0].__class__
  91. # class MyBackend(DEFAULT_JOBLIB_BACKEND): # type: ignore
  92. # def __init__(self, *args, **kwargs):
  93. # self.count = 0
  94. # super().__init__(*args, **kwargs)
  95. # def start_call(self):
  96. # self.count += 1
  97. # return super().start_call()
  98. # starpu.joblib.register_parallel_backend('testing', MyBackend)
  99. # with starpu.joblib.parallel_backend("testing") as (ba, n_jobs):
  100. # print("backend and n_jobs is", ba, n_jobs)
  101. ###############################################
  102. N=100
  103. # A=np.arange(N)
  104. # B=np.arange(N)
  105. # a=np.arange(N)
  106. # b=np.arange(N, 2*N, 1)
  107. displayPlot=False
  108. listX=[10, 100]
  109. for arg in sys.argv[1:]:
  110. if arg == "-long":
  111. listX = [10, 100, 1000, 10000, 100000, 1000000, 10000000]
  112. if arg == "-plot":
  113. displayPlot=True
  114. for x in listX:
  115. for X in range(x, x*10, x):
  116. #print("X=",X)
  117. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="log_list")(starpu.joblib.delayed(log10)(i+1)for i in range(X))
  118. A=np.arange(1,X+1,1)
  119. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="log_arr")(starpu.joblib.delayed(log10_arr)(A))
  120. print("************************")
  121. print("parallel Normal version:")
  122. print("************************")
  123. print("--(sqrt)(i**2)for i in range(N)")
  124. start_exec1=time.time()
  125. start_cpu1=time.process_time()
  126. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
  127. end_exec1=time.time()
  128. end_cpu1=time.process_time()
  129. print("the program execution time is", end_exec1-start_exec1)
  130. print("the cpu execution time is", end_cpu1-start_cpu1)
  131. print("--(multi)(i,j) for i,j in zip(a,b)")
  132. a=np.arange(N)
  133. b=np.arange(N, 2*N, 1)
  134. start_exec2=time.time()
  135. start_cpu2=time.process_time()
  136. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
  137. end_exec2=time.time()
  138. end_cpu2=time.process_time()
  139. print("the program execution time is", end_exec2-start_exec2)
  140. print("the cpu execution time is", end_cpu2-start_cpu2)
  141. print("--(scal_arr)((i for i in b), A)")
  142. A=np.arange(N)
  143. b=np.arange(N, 2*N, 1)
  144. start_exec3=time.time()
  145. start_cpu3=time.process_time()
  146. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal_arr")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
  147. end_exec3=time.time()
  148. end_cpu3=time.process_time()
  149. print("the program execution time is", end_exec3-start_exec3)
  150. print("the cpu execution time is", end_cpu3-start_cpu3)
  151. print("--(multi_list)((i,j) for i,j in zip(a,b))")
  152. a=np.arange(N)
  153. b=np.arange(N, 2*N, 1)
  154. start_exec4=time.time()
  155. start_cpu4=time.process_time()
  156. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
  157. end_exec4=time.time()
  158. end_cpu4=time.process_time()
  159. print("the program execution time is", end_exec4-start_exec4)
  160. print("the cpu execution time is", end_cpu4-start_cpu4)
  161. print("--(multi_2arr)((i for i in a), (j for j in b))")
  162. a=np.arange(N)
  163. b=np.arange(N, 2*N, 1)
  164. start_exec5=time.time()
  165. start_cpu5=time.process_time()
  166. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
  167. end_exec5=time.time()
  168. end_cpu5=time.process_time()
  169. print("the program execution time is", end_exec5-start_exec5)
  170. print("the cpu execution time is", end_cpu5-start_cpu5)
  171. print("--(multi_2arr)(A, B)")
  172. # A=np.arange(N)
  173. # B=np.arange(N, 2*N, 1)
  174. n, m = 4, 5
  175. A = np.arange(n*m).reshape(n, m)
  176. B = np.arange(n*m, 2*n*m, 1).reshape(n, m)
  177. print("The input arrays are A", A, "B", B)
  178. start_exec6=time.time()
  179. start_cpu6=time.process_time()
  180. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)(A, B))
  181. end_exec6=time.time()
  182. end_cpu6=time.process_time()
  183. print("the program execution time is", end_exec6-start_exec6)
  184. print("the cpu execution time is", end_cpu6-start_cpu6)
  185. print("The return arrays are A", A, "B", B)
  186. print("--(scal)(2, t=(j for j in a))")
  187. a=np.arange(N)
  188. start_exec7=time.time()
  189. start_cpu7=time.process_time()
  190. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2, t=(j for j in a)))
  191. end_exec7=time.time()
  192. end_cpu7=time.process_time()
  193. print("the program execution time is", end_exec7-start_exec7)
  194. print("the cpu execution time is", end_cpu7-start_cpu7)
  195. print("--(scal)(2,A)")
  196. A=np.arange(N)
  197. print("The input array is", A)
  198. start_exec8=time.time()
  199. start_cpu8=time.process_time()
  200. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2,A))
  201. end_exec8=time.time()
  202. end_cpu8=time.process_time()
  203. print("the program execution time is", end_exec8-start_exec8)
  204. print("the cpu execution time is", end_cpu8-start_cpu8)
  205. print("The return array is", A)
  206. print("--(add_scal)(t1=A,t2=B,a=2)")
  207. A=np.arange(N)
  208. B=np.arange(N)
  209. print("The input arrays are A", A, "B", B)
  210. start_exec9=time.time()
  211. start_cpu9=time.process_time()
  212. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(t1=A,t2=B,a=2))
  213. end_exec9=time.time()
  214. end_cpu9=time.process_time()
  215. print("the program execution time is", end_exec9-start_exec9)
  216. print("the cpu execution time is", end_cpu9-start_cpu9)
  217. print("The return arrays are A", A, "B", B)
  218. print("--input is iterable function list")
  219. start_exec10=time.time()
  220. start_cpu10=time.process_time()
  221. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="func")(g_func)
  222. end_exec10=time.time()
  223. end_cpu10=time.process_time()
  224. print("the program execution time is", end_exec10-start_exec10)
  225. print("the cpu execution time is", end_cpu10-start_cpu10)
  226. # def producer():
  227. # for i in range(6):
  228. # print('Produced %s' % i)
  229. # yield i
  230. #starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(sqrt)(i) for i in producer())
  231. print("************************")
  232. print("parallel Future version:")
  233. print("************************")
  234. async def main():
  235. print("--(sqrt)(i**2)for i in range(N)")
  236. fut1=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
  237. res1=await fut1
  238. #print(res1)
  239. print("--(multi)(i,j) for i,j in zip(a,b)")
  240. a=np.arange(N)
  241. b=np.arange(N, 2*N, 1)
  242. fut2=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
  243. res2=await fut2
  244. #print(res2)
  245. print("--(scal_arr)((i for i in b), A)")
  246. A=np.arange(N)
  247. b=np.arange(N, 2*N, 1)
  248. fut3=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="scal_arr")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
  249. res3=await fut3
  250. #print(res3)
  251. print("--(multi_list)((i,j) for i,j in zip(a,b))")
  252. a=np.arange(N)
  253. b=np.arange(N, 2*N, 1)
  254. fut4=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
  255. res4=await fut4
  256. #print(res4)
  257. print("--(multi_2arr)((i for i in a), (j for j in b))")
  258. a=np.arange(N)
  259. b=np.arange(N, 2*N, 1)
  260. fut5=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
  261. res5=await fut5
  262. #print(res5)
  263. print("--(multi_2arr)(b=B, a=A)")
  264. A=np.arange(N)
  265. B=np.arange(N, 2*N, 1)
  266. print("The input arrays are A", A, "B", B)
  267. fut6=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)(b=B, a=A))
  268. res6=await fut6
  269. print("The return arrays are A", A, "B", B)
  270. print("--(scal)(2, (j for j in a))")
  271. a=np.arange(N)
  272. fut7=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2, (j for j in a)))
  273. res7=await fut7
  274. #print(res6)
  275. print("--(scal)(2,t=A)")
  276. A=np.arange(N)
  277. print("The input array is", A)
  278. fut8=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2,t=A))
  279. res8=await fut8
  280. print("The return array is", A)
  281. print("--(scal)(2,A,B)")
  282. A=np.arange(N)
  283. B=np.arange(N)
  284. print("The input arrays are A", A, "B", B)
  285. fut9=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(2,A,B))
  286. res9=await fut9
  287. print("The return arrays are A", A, "B", B)
  288. print("--input is iterable function list")
  289. fut10=starpu.joblib.Parallel(mode="future", n_jobs=-1)(g_func)
  290. res10=await fut10
  291. #print(res9)
  292. asyncio.run(main())
  293. starpu.perfmodel_plot(perfmodel="sqrt",view=displayPlot)
  294. starpu.perfmodel_plot(perfmodel="multi",view=displayPlot)
  295. starpu.perfmodel_plot(perfmodel="scal_arr",view=displayPlot)
  296. starpu.perfmodel_plot(perfmodel="multi_list",view=displayPlot)
  297. starpu.perfmodel_plot(perfmodel="multi_2arr",view=displayPlot)
  298. starpu.perfmodel_plot(perfmodel="scal",view=displayPlot)
  299. starpu.perfmodel_plot(perfmodel="add_scal",view=displayPlot)
  300. starpu.perfmodel_plot(perfmodel="func",view=displayPlot)
  301. starpu.perfmodel_plot(perfmodel="log_list",view=displayPlot)
  302. starpu.perfmodel_plot(perfmodel="log_arr",view=displayPlot)
  303. starpupy.shutdown()