joblib.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. # StarPU --- Runtime system for heterogeneous multicore architectures.
  2. #
  3. # Copyright (C) 2020-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. #
  5. # StarPU is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU Lesser General Public License as published by
  7. # the Free Software Foundation; either version 2.1 of the License, or (at
  8. # your option) any later version.
  9. #
  10. # StarPU is distributed in the hope that it will be useful, but
  11. # WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. #
  14. # See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. #
  16. import sys
  17. import types
  18. import joblib as jl
  19. from joblib import logger
  20. from joblib._parallel_backends import ParallelBackendBase
  21. from starpu import starpupy
  22. import starpu
  23. import asyncio
  24. import math
  25. import functools
  26. try:
  27. import numpy as np
  28. has_numpy=True
  29. except:
  30. has_numpy=False
  31. import inspect
  32. import threading
  33. loop = asyncio.get_event_loop()
  34. if (loop.is_running()):
  35. try:
  36. import nest_asyncio
  37. nest_asyncio.apply()
  38. has_nest=True
  39. except ModuleNotFoundError as e:
  40. has_nest=False
  41. BACKENDS={
  42. #'loky': LokyBackend,
  43. }
  44. _backend = threading.local()
  45. # get the number of CPUs controlled by StarPU
  46. def cpu_count():
  47. n_cpus=starpupy.cpu_worker_get_count()
  48. return n_cpus
  49. # split a list ls into n_block numbers of sub-lists
  50. def partition(ls, n_block):
  51. if len(ls)>=n_block:
  52. # there are n1 sub-lists which contain q1 elements, and (n_block-n1) sublists which contain q2 elements (n1 can be 0)
  53. q1=math.ceil(len(ls)/n_block)
  54. q2=math.floor(len(ls)/n_block)
  55. n1=len(ls)%n_block
  56. #n2=n_block-n1
  57. # generate n1 sub-lists in L1, and (n_block-n1) sub-lists in L2
  58. L1=[ls[i:i+q1] for i in range(0, n1*q1, q1)]
  59. L2=[ls[i:i+q2] for i in range(n1*q1, len(ls), q2)]
  60. L=L1+L2
  61. else:
  62. # if the block number is larger than the length of list, each element in the list is a sub-list
  63. L=[ls[i:i+1] for i in range (len(ls))]
  64. return L
  65. # split a two-dimension numpy matrix into n_block numbers of sub-matrices
  66. def array2d_split(a, n_block):
  67. # decompose number of n_jobs to two integers multiply
  68. c_tmp=math.floor(math.sqrt(n_block))
  69. for i in range (c_tmp,0,-1):
  70. if n_block%i==0:
  71. c=i
  72. r=int(n_block/c)
  73. break
  74. # split column
  75. arr_split_c=np.array_split(a,c,0)
  76. arr_split=[]
  77. # split row
  78. for i in range(c):
  79. arr_split_r=np.array_split(arr_split_c[i],r,1)
  80. for j in range(r):
  81. arr_split.append(arr_split_r[j])
  82. return arr_split
  83. def future_generator(iterable, n_jobs, dict_task):
  84. # iterable is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
  85. #print("iterable type is ", type(iterable))
  86. #print("iterable is", iterable)
  87. # get the number of block
  88. if n_jobs<-cpu_count()-1 or n_jobs>cpu_count():
  89. raise SystemExit('Error: n_jobs is out of range')
  90. #print("Error: n_jobs is out of range, number of CPUs is", cpu_count())
  91. elif n_jobs<0:
  92. n_block=cpu_count()+1+n_jobs
  93. else:
  94. n_block=n_jobs
  95. if (n_block <= 0):
  96. n_block = 1
  97. # if arguments is tuple format
  98. if type(iterable) is tuple:
  99. # the function is always the first element
  100. f=iterable[0]
  101. # get the name of formal arguments of f
  102. formal_args=inspect.getfullargspec(f).args
  103. # get the arguments list
  104. args=[]
  105. # argument is arbitrary in iterable[1]
  106. args=list(iterable[1])
  107. # argument is keyword argument in iterable[2]
  108. for i in range(len(formal_args)):
  109. for j in iterable[2].keys():
  110. if j==formal_args[i]:
  111. args.append(iterable[2][j])
  112. # check whether all arrays have the same size
  113. l_arr=[]
  114. # list of Future result
  115. L_fut=[]
  116. # split the vector
  117. args_split=[]
  118. for i in range(len(args)):
  119. args_split.append([])
  120. # if the array is an numpy array
  121. if has_numpy and type(args[i]) is np.ndarray:
  122. # one-dimension matrix
  123. if args[i].ndim==1:
  124. # split numpy array
  125. args_split[i]=np.array_split(args[i],n_block)
  126. # get the length of numpy array
  127. l_arr.append(args[i].size)
  128. # two-dimension matrix
  129. elif args[i].ndim==2:
  130. # split numpy 2D array
  131. args_split[i]=array2d_split(args[i],n_block)
  132. # if the array is a generator
  133. elif isinstance(args[i],types.GeneratorType):
  134. # split generator
  135. args_split[i]=partition(list(args[i]),n_block)
  136. # get the length of generator
  137. l_arr.append(sum(len(args_split[i][j]) for j in range(len(args_split[i]))))
  138. if len(set(l_arr))>1:
  139. raise SystemExit('Error: all arrays should have the same size')
  140. #print("args list is", args_split)
  141. for i in range(n_block):
  142. # generate the argument list
  143. L_args=[]
  144. sizebase=0
  145. for j in range(len(args)):
  146. if (has_numpy and type(args[j]) is np.ndarray) or isinstance(args[j],types.GeneratorType):
  147. L_args.append(args_split[j][i])
  148. if sizebase==0:
  149. sizebase=len(args_split[j][i])
  150. elif sizebase==len(args_split[j][i]):
  151. continue
  152. else:
  153. raise SystemExit('Error: all arrays should be split into equal size')
  154. else:
  155. L_args.append(args[j])
  156. #print("L_args is", L_args)
  157. fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
  158. color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase)\
  159. (f, *L_args)
  160. L_fut.append(fut)
  161. return L_fut
  162. # if iterable is a generator or a list of function
  163. else:
  164. L=list(iterable)
  165. #print(L)
  166. # generate a list of function according to iterable
  167. def lf(ls):
  168. L_func=[]
  169. for i in range(len(ls)):
  170. # the first element is the function
  171. f=ls[i][0]
  172. # the second element is the args list of a type tuple
  173. L_args=list(ls[i][1])
  174. # generate a list of function
  175. L_func.append(f(*L_args))
  176. return L_func
  177. # generate the split function list
  178. L_split=partition(L,n_block)
  179. # operation in each split list
  180. L_fut=[]
  181. for i in range(len(L_split)):
  182. sizebase=len(L_split[i])
  183. fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
  184. color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase)\
  185. (lf, L_split[i])
  186. L_fut.append(fut)
  187. return L_fut
  188. class Parallel(object):
  189. def __init__(self, mode="normal", perfmodel=None, end_msg=None,\
  190. name=None, synchronous=0, priority=0, color=None, flops=None,\
  191. n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
  192. batch_size='auto', temp_folder=None, max_nbytes='1M',\
  193. mmap_mode='r', prefer=None, require=None):
  194. #active_backend= get_active_backend()
  195. # nesting_level = active_backend.nesting_level
  196. # if backend is None:
  197. # backend = active_backend
  198. # else:
  199. # try:
  200. # backend_factory = BACKENDS[backend]
  201. # except KeyError as e:
  202. # raise ValueError("Invalid backend: %s, expected one of %r"
  203. # % (backend, sorted(BACKENDS.keys()))) from e
  204. # backend = backend_factory(nesting_level=nesting_level)
  205. if n_jobs is None:
  206. n_jobs = 1
  207. self.mode=mode
  208. self.perfmodel=perfmodel
  209. self.end_msg=end_msg
  210. self.name=name
  211. self.synchronous=synchronous
  212. self.priority=priority
  213. self.color=color
  214. self.flops=flops
  215. self.n_jobs=n_jobs
  216. self._backend=backend
  217. def print_progress(self):
  218. #pass
  219. print("", starpupy.task_nsubmitted())
  220. def __call__(self,iterable):
  221. #generate the dictionary of task_submit
  222. dict_task={'name': self.name, 'synchronous': self.synchronous, 'priority': self.priority, 'color': self.color, 'flops': self.flops, 'perfmodel': self.perfmodel}
  223. if hasattr(self._backend, 'start_call'):
  224. self._backend.start_call()
  225. # the mode normal, user can call the function directly without using async
  226. if self.mode=="normal":
  227. async def asy_main():
  228. L_fut=future_generator(iterable, self.n_jobs, dict_task)
  229. res=[]
  230. for i in range(len(L_fut)):
  231. L_res=await L_fut[i]
  232. res.extend(L_res)
  233. #print(res)
  234. #print("type of result is", type(res))
  235. return res
  236. #asyncio.run(asy_main())
  237. #retVal=asy_main
  238. #loop = asyncio.get_event_loop()
  239. if(loop.is_running() and not has_nest):
  240. raise starpupy.error("Can't find \'nest_asyncio\' module (consider running \"pip3 install nest_asyncio\" or try to remove \"-m asyncio\" when starting Python interpreter)")
  241. results = loop.run_until_complete(asy_main())
  242. retVal = results
  243. # the mode future, user needs to use asyncio module and await the Future result in main function
  244. elif self.mode=="future":
  245. L_fut=future_generator(iterable, self.n_jobs, dict_task)
  246. fut=asyncio.gather(*L_fut)
  247. if self.end_msg!=None:
  248. fut.add_done_callback(functools.partial(print, self.end_msg))
  249. retVal=fut
  250. if hasattr(self._backend, 'stop_call'):
  251. self._backend.stop_call()
  252. return retVal
  253. def delayed(function):
  254. def delayed_function(*args, **kwargs):
  255. return function, args, kwargs
  256. return delayed_function
  257. ######################################################################
  258. __version__ = jl.__version__
  259. class Memory(jl.Memory):
  260. def __init__(self,location=None, backend='local', cachedir=None,
  261. mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
  262. backend_options=None):
  263. super(Memory, self).__init__(location=None, backend='local', cachedir=None,
  264. mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
  265. backend_options=None)
  266. def dump(value, filename, compress=0, protocol=None, cache_size=None):
  267. return jl.dump(value, filename, compress, protocol, cache_size)
  268. def load(filename, mmap_mode=None):
  269. return jl.load(filename, mmap_mode)
  270. def hash(obj, hash_name='md5', coerce_mmap=False):
  271. return jl.hash(obj, hash_name, coerce_mmap)
  272. def register_compressor(compressor_name, compressor, force=False):
  273. return jl.register_compressor(compressor_name, compressor, force)
  274. def effective_n_jobs(n_jobs=-1):
  275. return cpu_count()
  276. def get_active_backend():
  277. backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
  278. if backend_and_jobs is not None:
  279. backend,n_jobs=backend_and_jobs
  280. return backend
  281. backend = BACKENDS[loky](nesting_level=0)
  282. return backend
  283. class parallel_backend(object):
  284. def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
  285. **backend_params):
  286. if isinstance(backend, str):
  287. backend = BACKENDS[backend](**backend_params)
  288. current_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
  289. if backend.nesting_level is None:
  290. if current_backend_and_jobs is None:
  291. nesting_level = 0
  292. else:
  293. nesting_level = current_backend_and_jobs[0].nesting_level
  294. backend.nesting_level = nesting_level
  295. # Save the backends info and set the active backend
  296. self.old_backend_and_jobs = current_backend_and_jobs
  297. self.new_backend_and_jobs = (backend, n_jobs)
  298. _backend.backend_and_jobs = (backend, n_jobs)
  299. def __enter__(self):
  300. return self.new_backend_and_jobs
  301. def __exit__(self, type, value, traceback):
  302. self.unregister()
  303. def unregister(self):
  304. if self.old_backend_and_jobs is None:
  305. if getattr(_backend, 'backend_and_jobs', None) is not None:
  306. del _backend.backend_and_jobs
  307. else:
  308. _backend.backend_and_jobs = self.old_backend_and_jobs
  309. def register_parallel_backend(name, factory):
  310. BACKENDS[name] = factory