starpu_py_parallel.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. # StarPU --- Runtime system for heterogeneous multicore architectures.
  2. #
  3. # Copyright (C) 2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. #
  5. # StarPU is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU Lesser General Public License as published by
  7. # the Free Software Foundation; either version 2.1 of the License, or (at
  8. # your option) any later version.
  9. #
  10. # StarPU is distributed in the hope that it will be useful, but
  11. # WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. #
  14. # See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. #
  16. import starpu
  17. import starpu.joblib
  18. import time
  19. import asyncio
  20. from math import sqrt
  21. from math import log10
  22. import numpy as np
  23. #generate a list to store functions
  24. g_func=[]
  25. #function no input no output print hello world
  26. def hello():
  27. print ("Example 1: Hello, world!")
  28. g_func.append(starpu.joblib.delayed(hello)())
  29. #function no input no output
  30. def func1():
  31. print ("Example 2: This is a function no input no output")
  32. g_func.append(starpu.joblib.delayed(func1)())
  33. #function no input return a value
  34. def func2():
  35. print ("Example 3:")
  36. return 12
  37. g_func.append(starpu.joblib.delayed(func2)())
  38. #function has 2 int inputs and 1 int output
  39. def exp(a,b):
  40. res_exp=a**b
  41. print("Example 4: The result of ",a,"^",b,"is",res_exp)
  42. return res_exp
  43. g_func.append(starpu.joblib.delayed(exp)(2, 3))
  44. #function has 4 float inputs and 1 float output
  45. def add(a,b,c,d):
  46. res_add=a+b+c+d
  47. print("Example 5: The result of ",a,"+",b,"+",c,"+",d,"is",res_add)
  48. return res_add
  49. g_func.append(starpu.joblib.delayed(add)(1.2, 2.5, 3.6, 4.9))
  50. #function has 2 int inputs 1 float input and 1 float output 1 int output
  51. def sub(a,b,c):
  52. res_sub1=a-b-c
  53. res_sub2=a-b
  54. print ("Example 6: The result of ",a,"-",b,"-",c,"is",res_sub1,"and the result of",a,"-",b,"is",res_sub2)
  55. return res_sub1, res_sub2
  56. g_func.append(starpu.joblib.delayed(sub)(6, 2, 5.9))
  57. ##########functions of array calculation###############
  58. def scal(a, t):
  59. for i in range(len(t)):
  60. t[i]=t[i]*a
  61. return t
  62. def add_scal(a, t1, t2):
  63. for i in range(len(t1)):
  64. t1[i]=t1[i]*a+t2[i]
  65. return t1
  66. def scal_arr(a, t):
  67. for i in range(len(t)):
  68. t[i]=t[i]*a[i]
  69. return t
  70. def multi(a,b):
  71. res_multi=a*b
  72. return res_multi
  73. def multi_2arr(a, b):
  74. for i in range(len(a)):
  75. a[i]=a[i]*b[i]
  76. return a
  77. def multi_list(l):
  78. res = []
  79. for (a,b) in l:
  80. res.append(a*b)
  81. return res
  82. ########################################################
  83. #################scikit test###################
  84. DEFAULT_JOBLIB_BACKEND = starpu.joblib.get_active_backend()[0].__class__
  85. class MyBackend(DEFAULT_JOBLIB_BACKEND): # type: ignore
  86. def __init__(self, *args, **kwargs):
  87. self.count = 0
  88. super().__init__(*args, **kwargs)
  89. def start_call(self):
  90. self.count += 1
  91. return super().start_call()
  92. starpu.joblib.register_parallel_backend('testing', MyBackend)
  93. with starpu.joblib.parallel_backend("testing") as (ba, n_jobs):
  94. print("backend and n_jobs is", ba, n_jobs)
  95. ###############################################
  96. N=100
  97. # A=np.arange(N)
  98. # B=np.arange(N)
  99. # a=np.arange(N)
  100. # b=np.arange(N, 2*N, 1)
  101. #starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="log")(starpu.joblib.delayed(log10)(i+1)for i in range(N))
  102. # for x in [10, 100, 1000, 10000, 100000, 1000000]:
  103. # for X2 in range(x, x*10, x):
  104. # starpu.joblib.Parallel(mode="normal", n_jobs=2, perfmodel="log")(starpu.joblib.delayed(log10)(i+1)for i in range(X2))
  105. # print(range(X2))
  106. print("************************")
  107. print("parallel Normal version:")
  108. print("************************")
  109. print("--(sqrt)(i**2)for i in range(N)")
  110. start_exec1=time.time()
  111. start_cpu1=time.process_time()
  112. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
  113. end_exec1=time.time()
  114. end_cpu1=time.process_time()
  115. print("the program execution time is", end_exec1-start_exec1)
  116. print("the cpu execution time is", end_cpu1-start_cpu1)
  117. print("--(multi)(i,j) for i,j in zip(a,b)")
  118. a=np.arange(N)
  119. b=np.arange(N, 2*N, 1)
  120. start_exec2=time.time()
  121. start_cpu2=time.process_time()
  122. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
  123. end_exec2=time.time()
  124. end_cpu2=time.process_time()
  125. print("the program execution time is", end_exec2-start_exec2)
  126. print("the cpu execution time is", end_cpu2-start_cpu2)
  127. print("--(scal_arr)((i for i in b), A)")
  128. A=np.arange(N)
  129. b=np.arange(N, 2*N, 1)
  130. start_exec3=time.time()
  131. start_cpu3=time.process_time()
  132. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal_arr")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
  133. end_exec3=time.time()
  134. end_cpu3=time.process_time()
  135. print("the program execution time is", end_exec3-start_exec3)
  136. print("the cpu execution time is", end_cpu3-start_cpu3)
  137. print("--(multi_list)((i,j) for i,j in zip(a,b))")
  138. a=np.arange(N)
  139. b=np.arange(N, 2*N, 1)
  140. start_exec4=time.time()
  141. start_cpu4=time.process_time()
  142. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
  143. end_exec4=time.time()
  144. end_cpu4=time.process_time()
  145. print("the program execution time is", end_exec4-start_exec4)
  146. print("the cpu execution time is", end_cpu4-start_cpu4)
  147. print("--(multi_2arr)((i for i in a), (j for j in b))")
  148. a=np.arange(N)
  149. b=np.arange(N, 2*N, 1)
  150. start_exec5=time.time()
  151. start_cpu5=time.process_time()
  152. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
  153. end_exec5=time.time()
  154. end_cpu5=time.process_time()
  155. print("the program execution time is", end_exec5-start_exec5)
  156. print("the cpu execution time is", end_cpu5-start_cpu5)
  157. print("--(multi_2arr)(A, B)")
  158. A=np.arange(N)
  159. B=np.arange(N, 2*N, 1)
  160. print("The input arrays are A", A, "B", B)
  161. start_exec6=time.time()
  162. start_cpu6=time.process_time()
  163. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)(A, B))
  164. end_exec6=time.time()
  165. end_cpu6=time.process_time()
  166. print("the program execution time is", end_exec6-start_exec6)
  167. print("the cpu execution time is", end_cpu6-start_cpu6)
  168. print("The return arrays are A", A, "B", B)
  169. print("--(scal)(2, t=(j for j in a))")
  170. a=np.arange(N)
  171. start_exec7=time.time()
  172. start_cpu7=time.process_time()
  173. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2, t=(j for j in a)))
  174. end_exec7=time.time()
  175. end_cpu7=time.process_time()
  176. print("the program execution time is", end_exec7-start_exec7)
  177. print("the cpu execution time is", end_cpu7-start_cpu7)
  178. print("--(scal)(2,A)")
  179. A=np.arange(N)
  180. print("The input array is", A)
  181. start_exec8=time.time()
  182. start_cpu8=time.process_time()
  183. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2,A))
  184. end_exec8=time.time()
  185. end_cpu8=time.process_time()
  186. print("the program execution time is", end_exec8-start_exec8)
  187. print("the cpu execution time is", end_cpu8-start_cpu8)
  188. print("The return array is", A)
  189. print("--(add_scal)(t1=A,t2=B,a=2)")
  190. A=np.arange(N)
  191. B=np.arange(N)
  192. print("The input arrays are A", A, "B", B)
  193. start_exec9=time.time()
  194. start_cpu9=time.process_time()
  195. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(t1=A,t2=B,a=2))
  196. end_exec9=time.time()
  197. end_cpu9=time.process_time()
  198. print("the program execution time is", end_exec9-start_exec9)
  199. print("the cpu execution time is", end_cpu9-start_cpu9)
  200. print("The return arrays are A", A, "B", B)
  201. print("--input is iterable function list")
  202. start_exec10=time.time()
  203. start_cpu10=time.process_time()
  204. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="func")(g_func)
  205. end_exec10=time.time()
  206. end_cpu10=time.process_time()
  207. print("the program execution time is", end_exec10-start_exec10)
  208. print("the cpu execution time is", end_cpu10-start_cpu10)
  209. # def producer():
  210. # for i in range(6):
  211. # print('Produced %s' % i)
  212. # yield i
  213. #starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(sqrt)(i) for i in producer())
  214. print("************************")
  215. print("parallel Future version:")
  216. print("************************")
  217. async def main():
  218. print("--(sqrt)(i**2)for i in range(N)")
  219. fut1=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
  220. res1=await fut1
  221. #print(res1)
  222. print("--(multi)(i,j) for i,j in zip(a,b)")
  223. a=np.arange(N)
  224. b=np.arange(N, 2*N, 1)
  225. fut2=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
  226. res2=await fut2
  227. #print(res2)
  228. print("--(scal_arr)((i for i in b), A)")
  229. A=np.arange(N)
  230. b=np.arange(N, 2*N, 1)
  231. fut3=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="scal_arr")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
  232. res3=await fut3
  233. #print(res3)
  234. print("--(multi_list)((i,j) for i,j in zip(a,b))")
  235. a=np.arange(N)
  236. b=np.arange(N, 2*N, 1)
  237. fut4=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
  238. res4=await fut4
  239. #print(res4)
  240. print("--(multi_2arr)((i for i in a), (j for j in b))")
  241. a=np.arange(N)
  242. b=np.arange(N, 2*N, 1)
  243. fut5=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
  244. res5=await fut5
  245. #print(res5)
  246. print("--(multi_2arr)(b=B, a=A)")
  247. A=np.arange(N)
  248. B=np.arange(N, 2*N, 1)
  249. print("The input arrays are A", A, "B", B)
  250. fut6=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)(b=B, a=A))
  251. res6=await fut6
  252. print("The return arrays are A", A, "B", B)
  253. print("--(scal)(2, (j for j in a))")
  254. a=np.arange(N)
  255. fut7=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2, (j for j in a)))
  256. res7=await fut7
  257. #print(res6)
  258. print("--(scal)(2,t=A)")
  259. A=np.arange(N)
  260. print("The input array is", A)
  261. fut8=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="scal")(starpu.joblib.delayed(scal)(2,t=A))
  262. res8=await fut8
  263. print("The return array is", A)
  264. print("--(scal)(2,A,B)")
  265. A=np.arange(N)
  266. B=np.arange(N)
  267. print("The input arrays are A", A, "B", B)
  268. fut9=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(2,A,B))
  269. res9=await fut9
  270. print("The return arrays are A", A, "B", B)
  271. print("--input is iterable function list")
  272. fut10=starpu.joblib.Parallel(mode="future", n_jobs=2)(g_func)
  273. res10=await fut10
  274. #print(res9)
  275. asyncio.run(main())
  276. starpu.perfmodel_plot(perfmodel="sqrt",view=False)
  277. starpu.perfmodel_plot(perfmodel="multi",view=False)
  278. starpu.perfmodel_plot(perfmodel="scal_arr",view=False)
  279. starpu.perfmodel_plot(perfmodel="multi_list",view=False)
  280. starpu.perfmodel_plot(perfmodel="multi_2arr")
  281. starpu.perfmodel_plot(perfmodel="scal")
  282. starpu.perfmodel_plot(perfmodel="add_scal")
  283. starpu.perfmodel_plot(perfmodel="func",view=False)