starpu_py_parallel.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. # StarPU --- Runtime system for heterogeneous multicore architectures.
  2. #
  3. # Copyright (C) 2020-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. #
  5. # StarPU is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU Lesser General Public License as published by
  7. # the Free Software Foundation; either version 2.1 of the License, or (at
  8. # your option) any later version.
  9. #
  10. # StarPU is distributed in the hope that it will be useful, but
  11. # WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. #
  14. # See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. #
  16. import starpu
  17. import starpu.joblib
  18. from starpu import starpupy
  19. import time
  20. import asyncio
  21. from math import sqrt
  22. from math import log10
  23. try:
  24. import numpy as np
  25. except:
  26. exit(77)
  27. import sys
  28. #generate a list to store functions
  29. g_func=[]
  30. #function no input no output print hello world
  31. def hello():
  32. print ("Example 1: Hello, world!")
  33. g_func.append(starpu.joblib.delayed(hello)())
  34. #function no input no output
  35. def func1():
  36. print ("Example 2: This is a function no input no output")
  37. g_func.append(starpu.joblib.delayed(func1)())
  38. #function no input return a value
  39. def func2():
  40. print ("Example 3:")
  41. return 12
  42. g_func.append(starpu.joblib.delayed(func2)())
  43. #function has 2 int inputs and 1 int output
  44. def exp(a,b):
  45. res_exp=a**b
  46. print("Example 4: The result of ",a,"^",b,"is",res_exp)
  47. return res_exp
  48. g_func.append(starpu.joblib.delayed(exp)(2, 3))
  49. #function has 4 float inputs and 1 float output
  50. def add(a,b,c,d):
  51. res_add=a+b+c+d
  52. print("Example 5: The result of ",a,"+",b,"+",c,"+",d,"is",res_add)
  53. return res_add
  54. g_func.append(starpu.joblib.delayed(add)(1.2, 2.5, 3.6, 4.9))
  55. #function has 2 int inputs 1 float input and 1 float output 1 int output
  56. def sub(a,b,c):
  57. res_sub1=a-b-c
  58. res_sub2=a-b
  59. print ("Example 6: The result of ",a,"-",b,"-",c,"is",res_sub1,"and the result of",a,"-",b,"is",res_sub2)
  60. return res_sub1, res_sub2
  61. g_func.append(starpu.joblib.delayed(sub)(6, 2, 5.9))
  62. ##########functions of array calculation###############
  63. def scal(a, t):
  64. for i in range(len(t)):
  65. t[i]=t[i]*a
  66. return t
  67. def add_scal(a, t1, t2):
  68. for i in range(len(t1)):
  69. t1[i]=t1[i]*a+t2[i]
  70. return t1
  71. def scal_arr(a, t):
  72. for i in range(len(t)):
  73. t[i]=t[i]*a[i]
  74. return t
  75. def multi(a,b):
  76. res_multi=a*b
  77. return res_multi
  78. def multi_2arr(a, b):
  79. for i in range(len(a)):
  80. a[i]=a[i]*b[i]
  81. return a
  82. def multi_list(l):
  83. res = []
  84. for (a,b) in l:
  85. res.append(a*b)
  86. return res
  87. def log10_arr(t):
  88. for i in range(len(t)):
  89. t[i]=log10(t[i])
  90. return t
  91. ########################################################
  92. #################scikit test###################
  93. # DEFAULT_JOBLIB_BACKEND = starpu.joblib.get_active_backend()[0].__class__
  94. # class MyBackend(DEFAULT_JOBLIB_BACKEND): # type: ignore
  95. # def __init__(self, *args, **kwargs):
  96. # self.count = 0
  97. # super().__init__(*args, **kwargs)
  98. # def start_call(self):
  99. # self.count += 1
  100. # return super().start_call()
  101. # starpu.joblib.register_parallel_backend('testing', MyBackend)
  102. # with starpu.joblib.parallel_backend("testing") as (ba, n_jobs):
  103. # print("backend and n_jobs is", ba, n_jobs)
  104. ###############################################
  105. N=100
  106. # A=np.arange(N)
  107. # B=np.arange(N)
  108. # a=np.arange(N)
  109. # b=np.arange(N, 2*N, 1)
  110. displayPlot=False
  111. listX=[10, 100]
  112. for arg in sys.argv[1:]:
  113. if arg == "-long":
  114. listX = [10, 100, 1000, 10000, 100000, 1000000, 10000000]
  115. if arg == "-plot":
  116. displayPlot=True
  117. for x in listX:
  118. for X in range(x, x*10, x):
  119. #print("X=",X)
  120. try :
  121. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="log_list")(starpu.joblib.delayed(log10)(i+1)for i in range(X))
  122. A=np.arange(1,X+1,1)
  123. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="log_arr")(starpu.joblib.delayed(log10_arr)(A))
  124. except starpupy.error as e:
  125. print("No worker to execute the job")
  126. exit(77)
  127. print("************************")
  128. print("parallel Normal version:")
  129. print("************************")
  130. print("--(sqrt)(i**2)for i in range(N)")
  131. start_exec1=time.time()
  132. start_cpu1=time.process_time()
  133. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
  134. end_exec1=time.time()
  135. end_cpu1=time.process_time()
  136. print("the program execution time is", end_exec1-start_exec1)
  137. print("the cpu execution time is", end_cpu1-start_cpu1)
  138. print("--(multi)(i,j) for i,j in zip(a,b)")
  139. a=np.arange(N)
  140. b=np.arange(N, 2*N, 1)
  141. start_exec2=time.time()
  142. start_cpu2=time.process_time()
  143. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
  144. end_exec2=time.time()
  145. end_cpu2=time.process_time()
  146. print("the program execution time is", end_exec2-start_exec2)
  147. print("the cpu execution time is", end_cpu2-start_cpu2)
  148. print("--(scal_arr)((i for i in b), A)")
  149. A=np.arange(N)
  150. b=np.arange(N, 2*N, 1)
  151. start_exec3=time.time()
  152. start_cpu3=time.process_time()
  153. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal_arr")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
  154. end_exec3=time.time()
  155. end_cpu3=time.process_time()
  156. print("the program execution time is", end_exec3-start_exec3)
  157. print("the cpu execution time is", end_cpu3-start_cpu3)
  158. print("--(multi_list)((i,j) for i,j in zip(a,b))")
  159. a=np.arange(N)
  160. b=np.arange(N, 2*N, 1)
  161. start_exec4=time.time()
  162. start_cpu4=time.process_time()
  163. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
  164. end_exec4=time.time()
  165. end_cpu4=time.process_time()
  166. print("the program execution time is", end_exec4-start_exec4)
  167. print("the cpu execution time is", end_cpu4-start_cpu4)
  168. print("--(multi_2arr)((i for i in a), (j for j in b))")
  169. a=np.arange(N)
  170. b=np.arange(N, 2*N, 1)
  171. start_exec5=time.time()
  172. start_cpu5=time.process_time()
  173. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
  174. end_exec5=time.time()
  175. end_cpu5=time.process_time()
  176. print("the program execution time is", end_exec5-start_exec5)
  177. print("the cpu execution time is", end_cpu5-start_cpu5)
  178. print("--(multi_2arr)(A, B)")
  179. # A=np.arange(N)
  180. # B=np.arange(N, 2*N, 1)
  181. n, m = 4, 5
  182. A = np.arange(n*m).reshape(n, m)
  183. B = np.arange(n*m, 2*n*m, 1).reshape(n, m)
  184. start_exec6=time.time()
  185. start_cpu6=time.process_time()
  186. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)(A, B))
  187. end_exec6=time.time()
  188. end_cpu6=time.process_time()
  189. print("the program execution time is", end_exec6-start_exec6)
  190. print("the cpu execution time is", end_cpu6-start_cpu6)
  191. print("--(scal)(2, t=(j for j in a))")
  192. a=np.arange(N)
  193. start_exec7=time.time()
  194. start_cpu7=time.process_time()
  195. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2, t=(j for j in a)))
  196. end_exec7=time.time()
  197. end_cpu7=time.process_time()
  198. print("the program execution time is", end_exec7-start_exec7)
  199. print("the cpu execution time is", end_cpu7-start_cpu7)
  200. print("--(scal)(2,A)")
  201. A=np.arange(N)
  202. start_exec8=time.time()
  203. start_cpu8=time.process_time()
  204. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2,A))
  205. end_exec8=time.time()
  206. end_cpu8=time.process_time()
  207. print("the program execution time is", end_exec8-start_exec8)
  208. print("the cpu execution time is", end_cpu8-start_cpu8)
  209. print("--(add_scal)(t1=A,t2=B,a=2)")
  210. A=np.arange(N)
  211. B=np.arange(N)
  212. start_exec9=time.time()
  213. start_cpu9=time.process_time()
  214. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(t1=A,t2=B,a=2))
  215. end_exec9=time.time()
  216. end_cpu9=time.process_time()
  217. print("the program execution time is", end_exec9-start_exec9)
  218. print("the cpu execution time is", end_cpu9-start_cpu9)
  219. print("--input is iterable function list")
  220. start_exec10=time.time()
  221. start_cpu10=time.process_time()
  222. starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="func")(g_func)
  223. end_exec10=time.time()
  224. end_cpu10=time.process_time()
  225. print("the program execution time is", end_exec10-start_exec10)
  226. print("the cpu execution time is", end_cpu10-start_cpu10)
  227. # def producer():
  228. # for i in range(6):
  229. # print('Produced %s' % i)
  230. # yield i
  231. #starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(sqrt)(i) for i in producer())
  232. print("************************")
  233. print("parallel Future version:")
  234. print("************************")
  235. async def main():
  236. print("--(sqrt)(i**2)for i in range(N)")
  237. fut1=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
  238. res1=await fut1
  239. print("The result is", sum(res1,[]))
  240. print("--(multi)(i,j) for i,j in zip(a,b)")
  241. a=np.arange(N)
  242. b=np.arange(N, 2*N, 1)
  243. print("The inputs are a", a, "b", b)
  244. fut2=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
  245. res2=await fut2
  246. print("The result is", sum(res2,[]))
  247. print("--(scal_arr)((i for i in b), A)")
  248. A=np.arange(N)
  249. b=np.arange(N, 2*N, 1)
  250. print("The input arrays are A", A, "b", b)
  251. fut3=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="scal_arr")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
  252. res3=await fut3
  253. print("The return array is", np.concatenate(res3))
  254. print("--(multi_list)((i,j) for i,j in zip(a,b))")
  255. a=np.arange(N)
  256. b=np.arange(N, 2*N, 1)
  257. print("The input lists are a", a, "b", b)
  258. fut4=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
  259. res4=await fut4
  260. print("The result is", sum(res4,[]))
  261. print("--(multi_2arr)((i for i in a), (j for j in b))")
  262. a=np.arange(N)
  263. b=np.arange(N, 2*N, 1)
  264. print("The input lists are a", a, "b", b)
  265. fut5=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
  266. res5=await fut5
  267. print("The result is", sum(res5,[]))
  268. print("--(multi_2arr)(b=B, a=A)")
  269. A=np.arange(N)
  270. B=np.arange(N, 2*N, 1)
  271. print("The input arrays are A", A, "B", B)
  272. fut6=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)(b=B, a=A))
  273. res6=await fut6
  274. print("The return array is", np.concatenate(res6))
  275. print("--(scal)(2, (j for j in a))")
  276. a=np.arange(N)
  277. print("The input list is a", a)
  278. fut7=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2, (j for j in a)))
  279. res7=await fut7
  280. print("The result is", sum(res7,[]))
  281. print("--(scal)(2,t=A)")
  282. A=np.arange(N)
  283. print("The input array is", A)
  284. fut8=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2,t=A))
  285. res8=await fut8
  286. print("The return array is", np.concatenate(res8))
  287. print("--(scal)(2,A,B)")
  288. A=np.arange(N)
  289. B=np.arange(N)
  290. print("The input arrays are A", A, "B", B)
  291. fut9=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(2,A,B))
  292. res9=await fut9
  293. print("The return array is", np.concatenate(res9))
  294. print("--input is iterable function list")
  295. fut10=starpu.joblib.Parallel(mode="future", n_jobs=-1)(g_func)
  296. res10=await fut10
  297. #print(res9)
  298. try:
  299. asyncio.run(main())
  300. except starpupy.error as e:
  301. starpupy.shutdown()
  302. exit(77)
  303. starpu.perfmodel_plot(perfmodel="sqrt",view=displayPlot)
  304. starpu.perfmodel_plot(perfmodel="multi",view=displayPlot)
  305. starpu.perfmodel_plot(perfmodel="scal_arr",view=displayPlot)
  306. starpu.perfmodel_plot(perfmodel="multi_list",view=displayPlot)
  307. starpu.perfmodel_plot(perfmodel="multi_2arr",view=displayPlot)
  308. starpu.perfmodel_plot(perfmodel="scal",view=displayPlot)
  309. starpu.perfmodel_plot(perfmodel="add_scal",view=displayPlot)
  310. starpu.perfmodel_plot(perfmodel="func",view=displayPlot)
  311. starpu.perfmodel_plot(perfmodel="log_list",view=displayPlot)
  312. starpu.perfmodel_plot(perfmodel="log_arr",view=displayPlot)
  313. starpupy.shutdown()