joblib.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. # StarPU --- Runtime system for heterogeneous multicore architectures.
  2. #
  3. # Copyright (C) 2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. #
  5. # StarPU is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU Lesser General Public License as published by
  7. # the Free Software Foundation; either version 2.1 of the License, or (at
  8. # your option) any later version.
  9. #
  10. # StarPU is distributed in the hope that it will be useful, but
  11. # WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. #
  14. # See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. #
  16. import sys
  17. import types
  18. import joblib as jl
  19. from joblib import logger
  20. from joblib._parallel_backends import ParallelBackendBase
  21. from starpu import starpupy
  22. import starpu
  23. import asyncio
  24. import math
  25. import functools
  26. import numpy as np
  27. import inspect
  28. import threading
  29. BACKENDS={
  30. #'loky': LokyBackend,
  31. }
  32. _backend = threading.local()
  33. # get the number of CPUs controlled by StarPU
  34. def cpu_count():
  35. n_cpus=starpupy.cpu_worker_get_count()
  36. return n_cpus
  37. # split a list ls into n_block numbers of sub-lists
  38. def partition(ls, n_block):
  39. if len(ls)>=n_block:
  40. # there are n1 sub-lists which contain q1 elements, and (n_block-n1) sublists which contain q2 elements (n1 can be 0)
  41. q1=math.ceil(len(ls)/n_block)
  42. q2=math.floor(len(ls)/n_block)
  43. n1=len(ls)%n_block
  44. #n2=n_block-n1
  45. # generate n1 sub-lists in L1, and (n_block-n1) sub-lists in L2
  46. L1=[ls[i:i+q1] for i in range(0, n1*q1, q1)]
  47. L2=[ls[i:i+q2] for i in range(n1*q1, len(ls), q2)]
  48. L=L1+L2
  49. else:
  50. # if the block number is larger than the length of list, each element in the list is a sub-list
  51. L=[ls[i:i+1] for i in range (len(ls))]
  52. return L
  53. # split a two-dimension numpy matrix into n_block numbers of sub-matrices
  54. def array2d_split(a, n_block):
  55. # decompose number of n_jobs to two integers multiply
  56. c_tmp=math.floor(math.sqrt(n_block))
  57. for i in range (c_tmp,0,-1):
  58. if n_block%i==0:
  59. c=i
  60. r=int(n_block/c)
  61. break
  62. # split column
  63. arr_split_c=np.array_split(a,c,0)
  64. arr_split=[]
  65. # split row
  66. for i in range(c):
  67. arr_split_r=np.array_split(arr_split_c[i],r,1)
  68. for j in range(r):
  69. arr_split.append(arr_split_r[j])
  70. return arr_split
  71. def future_generator(iterable, n_jobs, dict_task):
  72. # iterable is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
  73. #print("iterable type is ", type(iterable))
  74. #print("iterable is", iterable)
  75. # get the number of block
  76. if n_jobs<-cpu_count()-1 or n_jobs>cpu_count():
  77. raise SystemExit('Error: n_jobs is out of range')
  78. #print("Error: n_jobs is out of range, number of CPUs is", cpu_count())
  79. elif n_jobs<0:
  80. n_block=cpu_count()+1+n_jobs
  81. else:
  82. n_block=n_jobs
  83. # if arguments is tuple format
  84. if type(iterable) is tuple:
  85. # the function is always the first element
  86. f=iterable[0]
  87. # get the name of formal arguments of f
  88. formal_args=inspect.getargspec(f).args
  89. # get the arguments list
  90. args=[]
  91. # argument is arbitrary in iterable[1]
  92. args=list(iterable[1])
  93. # argument is keyword argument in iterable[2]
  94. for i in range(len(formal_args)):
  95. for j in iterable[2].keys():
  96. if j==formal_args[i]:
  97. args.append(iterable[2][j])
  98. # check whether all arrays have the same size
  99. l_arr=[]
  100. # list of Future result
  101. L_fut=[]
  102. # split the vector
  103. args_split=[]
  104. for i in range(len(args)):
  105. args_split.append([])
  106. # if the array is an numpy array
  107. if type(args[i]) is np.ndarray:
  108. # one-dimension matrix
  109. if args[i].ndim==1:
  110. # split numpy array
  111. args_split[i]=np.array_split(args[i],n_block)
  112. # get the length of numpy array
  113. l_arr.append(args[i].size)
  114. # two-dimension matrix
  115. elif args[i].ndim==2:
  116. # split numpy 2D array
  117. args_split[i]=array2d_split(args[i],n_block)
  118. # if the array is a generator
  119. elif isinstance(args[i],types.GeneratorType):
  120. # split generator
  121. args_split[i]=partition(list(args[i]),n_block)
  122. # get the length of generator
  123. l_arr.append(sum(len(args_split[i][j]) for j in range(len(args_split[i]))))
  124. if len(set(l_arr))>1:
  125. raise SystemExit('Error: all arrays should have the same size')
  126. #print("args list is", args_split)
  127. for i in range(n_block):
  128. # generate the argument list
  129. L_args=[]
  130. sizebase=0
  131. for j in range(len(args)):
  132. if type(args[j]) is np.ndarray or isinstance(args[j],types.GeneratorType):
  133. L_args.append(args_split[j][i])
  134. if sizebase==0:
  135. sizebase=len(args_split[j][i])
  136. else:
  137. if sizebase==len(args_split[j][i]):
  138. continue
  139. else:
  140. raise SystemExit('Error: all arrays should be split into equal size')
  141. else:
  142. L_args.append(args[j])
  143. #print("L_args is", L_args)
  144. fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
  145. color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase)\
  146. (f, *L_args)
  147. L_fut.append(fut)
  148. return L_fut
  149. # if iterable is a generator or a list of function
  150. else:
  151. L=list(iterable)
  152. #print(L)
  153. # generate a list of function according to iterable
  154. def lf(ls):
  155. L_func=[]
  156. for i in range(len(ls)):
  157. # the first element is the function
  158. f=ls[i][0]
  159. # the second element is the args list of a type tuple
  160. L_args=list(ls[i][1])
  161. # generate a list of function
  162. L_func.append(f(*L_args))
  163. return L_func
  164. # generate the split function list
  165. L_split=partition(L,n_block)
  166. # operation in each split list
  167. L_fut=[]
  168. for i in range(len(L_split)):
  169. sizebase=len(L_split[i])
  170. fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
  171. color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase)\
  172. (lf, L_split[i])
  173. L_fut.append(fut)
  174. return L_fut
  175. class Parallel(object):
  176. def __init__(self, mode="normal", perfmodel=None, end_msg=None,\
  177. name=None, synchronous=0, priority=0, color=None, flops=None,\
  178. n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
  179. batch_size='auto', temp_folder=None, max_nbytes='1M',\
  180. mmap_mode='r', prefer=None, require=None):
  181. #active_backend= get_active_backend()
  182. # nesting_level = active_backend.nesting_level
  183. # if backend is None:
  184. # backend = active_backend
  185. # else:
  186. # try:
  187. # backend_factory = BACKENDS[backend]
  188. # except KeyError as e:
  189. # raise ValueError("Invalid backend: %s, expected one of %r"
  190. # % (backend, sorted(BACKENDS.keys()))) from e
  191. # backend = backend_factory(nesting_level=nesting_level)
  192. if n_jobs is None:
  193. n_jobs = 1
  194. self.mode=mode
  195. self.perfmodel=perfmodel
  196. self.end_msg=end_msg
  197. self.name=name
  198. self.synchronous=synchronous
  199. self.priority=priority
  200. self.color=color
  201. self.flops=flops
  202. self.n_jobs=n_jobs
  203. self._backend=backend
  204. def print_progress(self):
  205. #pass
  206. print("", starpupy.task_nsubmitted())
  207. def __call__(self,iterable):
  208. #generate the dictionary of task_submit
  209. dict_task={'name': self.name, 'synchronous': self.synchronous, 'priority': self.priority, 'color': self.color, 'flops': self.flops, 'perfmodel': self.perfmodel}
  210. if hasattr(self._backend, 'start_call'):
  211. self._backend.start_call()
  212. # the mode normal, user can call the function directly without using async
  213. if self.mode=="normal":
  214. async def asy_main():
  215. L_fut=future_generator(iterable, self.n_jobs, dict_task)
  216. res=[]
  217. for i in range(len(L_fut)):
  218. L_res=await L_fut[i]
  219. res.extend(L_res)
  220. #print(res)
  221. #print("type of result is", type(res))
  222. return res
  223. #asyncio.run(asy_main())
  224. #retVal=asy_main
  225. loop = asyncio.get_event_loop()
  226. results = loop.run_until_complete(asy_main())
  227. retVal = results
  228. # the mode future, user needs to use asyncio module and await the Future result in main function
  229. elif self.mode=="future":
  230. L_fut=future_generator(iterable, self.n_jobs, dict_task)
  231. fut=asyncio.gather(*L_fut)
  232. if self.end_msg!=None:
  233. fut.add_done_callback(functools.partial(print, self.end_msg))
  234. retVal=fut
  235. if hasattr(self._backend, 'stop_call'):
  236. self._backend.stop_call()
  237. return retVal
  238. def delayed(function):
  239. def delayed_function(*args, **kwargs):
  240. return function, args, kwargs
  241. return delayed_function
  242. ######################################################################
  243. __version__ = jl.__version__
  244. class Memory(jl.Memory):
  245. def __init__(self,location=None, backend='local', cachedir=None,
  246. mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
  247. backend_options=None):
  248. super(Memory, self).__init__(location=None, backend='local', cachedir=None,
  249. mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
  250. backend_options=None)
  251. def dump(value, filename, compress=0, protocol=None, cache_size=None):
  252. return jl.dump(value, filename, compress, protocol, cache_size)
  253. def load(filename, mmap_mode=None):
  254. return jl.load(filename, mmap_mode)
  255. def hash(obj, hash_name='md5', coerce_mmap=False):
  256. return jl.hash(obj, hash_name, coerce_mmap)
  257. def register_compressor(compressor_name, compressor, force=False):
  258. return jl.register_compressor(compressor_name, compressor, force)
  259. def effective_n_jobs(n_jobs=-1):
  260. return cpu_count()
  261. def get_active_backend():
  262. backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
  263. if backend_and_jobs is not None:
  264. backend,n_jobs=backend_and_jobs
  265. return backend
  266. backend = BACKENDS[loky](nesting_level=0)
  267. return backend
  268. class parallel_backend(object):
  269. def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
  270. **backend_params):
  271. if isinstance(backend, str):
  272. backend = BACKENDS[backend](**backend_params)
  273. current_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
  274. if backend.nesting_level is None:
  275. if current_backend_and_jobs is None:
  276. nesting_level = 0
  277. else:
  278. nesting_level = current_backend_and_jobs[0].nesting_level
  279. backend.nesting_level = nesting_level
  280. # Save the backends info and set the active backend
  281. self.old_backend_and_jobs = current_backend_and_jobs
  282. self.new_backend_and_jobs = (backend, n_jobs)
  283. _backend.backend_and_jobs = (backend, n_jobs)
  284. def __enter__(self):
  285. return self.new_backend_and_jobs
  286. def __exit__(self, type, value, traceback):
  287. self.unregister()
  288. def unregister(self):
  289. if self.old_backend_and_jobs is None:
  290. if getattr(_backend, 'backend_and_jobs', None) is not None:
  291. del _backend.backend_and_jobs
  292. else:
  293. _backend.backend_and_jobs = self.old_backend_and_jobs
  294. def register_parallel_backend(name, factory):
  295. BACKENDS[name] = factory