joblib.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. # StarPU --- Runtime system for heterogeneous multicore architectures.
  2. #
  3. # Copyright (C) 2020-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. #
  5. # StarPU is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU Lesser General Public License as published by
  7. # the Free Software Foundation; either version 2.1 of the License, or (at
  8. # your option) any later version.
  9. #
  10. # StarPU is distributed in the hope that it will be useful, but
  11. # WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. #
  14. # See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. #
  16. import sys
  17. import types
  18. import joblib as jl
  19. from joblib import logger
  20. from joblib._parallel_backends import ParallelBackendBase
  21. from starpu import starpupy
  22. import starpu
  23. import asyncio
  24. import math
  25. import functools
  26. try:
  27. import numpy as np
  28. has_numpy=True
  29. except:
  30. has_numpy=False
  31. import inspect
  32. import threading
  33. BACKENDS={
  34. #'loky': LokyBackend,
  35. }
  36. _backend = threading.local()
  37. # get the number of CPUs controlled by StarPU
  38. def cpu_count():
  39. n_cpus=starpupy.cpu_worker_get_count()
  40. return n_cpus
  41. # split a list ls into n_block numbers of sub-lists
  42. def partition(ls, n_block):
  43. if len(ls)>=n_block:
  44. # there are n1 sub-lists which contain q1 elements, and (n_block-n1) sublists which contain q2 elements (n1 can be 0)
  45. q1=math.ceil(len(ls)/n_block)
  46. q2=math.floor(len(ls)/n_block)
  47. n1=len(ls)%n_block
  48. #n2=n_block-n1
  49. # generate n1 sub-lists in L1, and (n_block-n1) sub-lists in L2
  50. L1=[ls[i:i+q1] for i in range(0, n1*q1, q1)]
  51. L2=[ls[i:i+q2] for i in range(n1*q1, len(ls), q2)]
  52. L=L1+L2
  53. else:
  54. # if the block number is larger than the length of list, each element in the list is a sub-list
  55. L=[ls[i:i+1] for i in range (len(ls))]
  56. return L
  57. # split a two-dimension numpy matrix into n_block numbers of sub-matrices
  58. def array2d_split(a, n_block):
  59. # decompose number of n_jobs to two integers multiply
  60. c_tmp=math.floor(math.sqrt(n_block))
  61. for i in range (c_tmp,0,-1):
  62. if n_block%i==0:
  63. c=i
  64. r=int(n_block/c)
  65. break
  66. # split column
  67. arr_split_c=np.array_split(a,c,0)
  68. arr_split=[]
  69. # split row
  70. for i in range(c):
  71. arr_split_r=np.array_split(arr_split_c[i],r,1)
  72. for j in range(r):
  73. arr_split.append(arr_split_r[j])
  74. return arr_split
  75. def future_generator(iterable, n_jobs, dict_task):
  76. # iterable is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
  77. #print("iterable type is ", type(iterable))
  78. #print("iterable is", iterable)
  79. # get the number of block
  80. if n_jobs<-cpu_count()-1 or n_jobs>cpu_count():
  81. raise SystemExit('Error: n_jobs is out of range')
  82. #print("Error: n_jobs is out of range, number of CPUs is", cpu_count())
  83. elif n_jobs<0:
  84. n_block=cpu_count()+1+n_jobs
  85. else:
  86. n_block=n_jobs
  87. if (n_block <= 0):
  88. n_block = 1
  89. # if arguments is tuple format
  90. if type(iterable) is tuple:
  91. # the function is always the first element
  92. f=iterable[0]
  93. # get the name of formal arguments of f
  94. formal_args=inspect.getfullargspec(f).args
  95. # get the arguments list
  96. args=[]
  97. # argument is arbitrary in iterable[1]
  98. args=list(iterable[1])
  99. # argument is keyword argument in iterable[2]
  100. for i in range(len(formal_args)):
  101. for j in iterable[2].keys():
  102. if j==formal_args[i]:
  103. args.append(iterable[2][j])
  104. # check whether all arrays have the same size
  105. l_arr=[]
  106. # list of Future result
  107. L_fut=[]
  108. # split the vector
  109. args_split=[]
  110. for i in range(len(args)):
  111. args_split.append([])
  112. # if the array is an numpy array
  113. if has_numpy and type(args[i]) is np.ndarray:
  114. # one-dimension matrix
  115. if args[i].ndim==1:
  116. # split numpy array
  117. args_split[i]=np.array_split(args[i],n_block)
  118. # get the length of numpy array
  119. l_arr.append(args[i].size)
  120. # two-dimension matrix
  121. elif args[i].ndim==2:
  122. # split numpy 2D array
  123. args_split[i]=array2d_split(args[i],n_block)
  124. # if the array is a generator
  125. elif isinstance(args[i],types.GeneratorType):
  126. # split generator
  127. args_split[i]=partition(list(args[i]),n_block)
  128. # get the length of generator
  129. l_arr.append(sum(len(args_split[i][j]) for j in range(len(args_split[i]))))
  130. if len(set(l_arr))>1:
  131. raise SystemExit('Error: all arrays should have the same size')
  132. #print("args list is", args_split)
  133. for i in range(n_block):
  134. # generate the argument list
  135. L_args=[]
  136. sizebase=0
  137. for j in range(len(args)):
  138. if (has_numpy and type(args[j]) is np.ndarray) or isinstance(args[j],types.GeneratorType):
  139. L_args.append(args_split[j][i])
  140. if sizebase==0:
  141. sizebase=len(args_split[j][i])
  142. elif sizebase==len(args_split[j][i]):
  143. continue
  144. else:
  145. raise SystemExit('Error: all arrays should be split into equal size')
  146. else:
  147. L_args.append(args[j])
  148. #print("L_args is", L_args)
  149. fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
  150. color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase)\
  151. (f, *L_args)
  152. L_fut.append(fut)
  153. return L_fut
  154. # if iterable is a generator or a list of function
  155. else:
  156. L=list(iterable)
  157. #print(L)
  158. # generate a list of function according to iterable
  159. def lf(ls):
  160. L_func=[]
  161. for i in range(len(ls)):
  162. # the first element is the function
  163. f=ls[i][0]
  164. # the second element is the args list of a type tuple
  165. L_args=list(ls[i][1])
  166. # generate a list of function
  167. L_func.append(f(*L_args))
  168. return L_func
  169. # generate the split function list
  170. L_split=partition(L,n_block)
  171. # operation in each split list
  172. L_fut=[]
  173. for i in range(len(L_split)):
  174. sizebase=len(L_split[i])
  175. fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
  176. color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase)\
  177. (lf, L_split[i])
  178. L_fut.append(fut)
  179. return L_fut
  180. class Parallel(object):
  181. def __init__(self, mode="normal", perfmodel=None, end_msg=None,\
  182. name=None, synchronous=0, priority=0, color=None, flops=None,\
  183. n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
  184. batch_size='auto', temp_folder=None, max_nbytes='1M',\
  185. mmap_mode='r', prefer=None, require=None):
  186. #active_backend= get_active_backend()
  187. # nesting_level = active_backend.nesting_level
  188. # if backend is None:
  189. # backend = active_backend
  190. # else:
  191. # try:
  192. # backend_factory = BACKENDS[backend]
  193. # except KeyError as e:
  194. # raise ValueError("Invalid backend: %s, expected one of %r"
  195. # % (backend, sorted(BACKENDS.keys()))) from e
  196. # backend = backend_factory(nesting_level=nesting_level)
  197. if n_jobs is None:
  198. n_jobs = 1
  199. self.mode=mode
  200. self.perfmodel=perfmodel
  201. self.end_msg=end_msg
  202. self.name=name
  203. self.synchronous=synchronous
  204. self.priority=priority
  205. self.color=color
  206. self.flops=flops
  207. self.n_jobs=n_jobs
  208. self._backend=backend
  209. def print_progress(self):
  210. #pass
  211. print("", starpupy.task_nsubmitted())
  212. def __call__(self,iterable):
  213. #generate the dictionary of task_submit
  214. dict_task={'name': self.name, 'synchronous': self.synchronous, 'priority': self.priority, 'color': self.color, 'flops': self.flops, 'perfmodel': self.perfmodel}
  215. if hasattr(self._backend, 'start_call'):
  216. self._backend.start_call()
  217. # the mode normal, user can call the function directly without using async
  218. if self.mode=="normal":
  219. async def asy_main():
  220. L_fut=future_generator(iterable, self.n_jobs, dict_task)
  221. res=[]
  222. for i in range(len(L_fut)):
  223. L_res=await L_fut[i]
  224. res.extend(L_res)
  225. #print(res)
  226. #print("type of result is", type(res))
  227. return res
  228. #asyncio.run(asy_main())
  229. #retVal=asy_main
  230. loop = asyncio.get_event_loop()
  231. results = loop.run_until_complete(asy_main())
  232. retVal = results
  233. # the mode future, user needs to use asyncio module and await the Future result in main function
  234. elif self.mode=="future":
  235. L_fut=future_generator(iterable, self.n_jobs, dict_task)
  236. fut=asyncio.gather(*L_fut)
  237. if self.end_msg!=None:
  238. fut.add_done_callback(functools.partial(print, self.end_msg))
  239. retVal=fut
  240. if hasattr(self._backend, 'stop_call'):
  241. self._backend.stop_call()
  242. return retVal
  243. def delayed(function):
  244. def delayed_function(*args, **kwargs):
  245. return function, args, kwargs
  246. return delayed_function
  247. ######################################################################
  248. __version__ = jl.__version__
  249. class Memory(jl.Memory):
  250. def __init__(self,location=None, backend='local', cachedir=None,
  251. mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
  252. backend_options=None):
  253. super(Memory, self).__init__(location=None, backend='local', cachedir=None,
  254. mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
  255. backend_options=None)
  256. def dump(value, filename, compress=0, protocol=None, cache_size=None):
  257. return jl.dump(value, filename, compress, protocol, cache_size)
  258. def load(filename, mmap_mode=None):
  259. return jl.load(filename, mmap_mode)
  260. def hash(obj, hash_name='md5', coerce_mmap=False):
  261. return jl.hash(obj, hash_name, coerce_mmap)
  262. def register_compressor(compressor_name, compressor, force=False):
  263. return jl.register_compressor(compressor_name, compressor, force)
  264. def effective_n_jobs(n_jobs=-1):
  265. return cpu_count()
  266. def get_active_backend():
  267. backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
  268. if backend_and_jobs is not None:
  269. backend,n_jobs=backend_and_jobs
  270. return backend
  271. backend = BACKENDS[loky](nesting_level=0)
  272. return backend
  273. class parallel_backend(object):
  274. def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
  275. **backend_params):
  276. if isinstance(backend, str):
  277. backend = BACKENDS[backend](**backend_params)
  278. current_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
  279. if backend.nesting_level is None:
  280. if current_backend_and_jobs is None:
  281. nesting_level = 0
  282. else:
  283. nesting_level = current_backend_and_jobs[0].nesting_level
  284. backend.nesting_level = nesting_level
  285. # Save the backends info and set the active backend
  286. self.old_backend_and_jobs = current_backend_and_jobs
  287. self.new_backend_and_jobs = (backend, n_jobs)
  288. _backend.backend_and_jobs = (backend, n_jobs)
  289. def __enter__(self):
  290. return self.new_backend_and_jobs
  291. def __exit__(self, type, value, traceback):
  292. self.unregister()
  293. def unregister(self):
  294. if self.old_backend_and_jobs is None:
  295. if getattr(_backend, 'backend_and_jobs', None) is not None:
  296. del _backend.backend_and_jobs
  297. else:
  298. _backend.backend_and_jobs = self.old_backend_and_jobs
  299. def register_parallel_backend(name, factory):
  300. BACKENDS[name] = factory