joblib.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. # StarPU --- Runtime system for heterogeneous multicore architectures.
  2. #
  3. # Copyright (C) 2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. #
  5. # StarPU is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU Lesser General Public License as published by
  7. # the Free Software Foundation; either version 2.1 of the License, or (at
  8. # your option) any later version.
  9. #
  10. # StarPU is distributed in the hope that it will be useful, but
  11. # WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. #
  14. # See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. #
  16. import sys
  17. import types
  18. import joblib as jl
  19. from joblib import logger
  20. from joblib._parallel_backends import ParallelBackendBase
  21. from starpu import starpupy
  22. import starpu
  23. import asyncio
  24. import math
  25. import functools
  26. import numpy as np
  27. import inspect
  28. import threading
  29. BACKENDS={
  30. #'loky': LokyBackend,
  31. }
  32. _backend = threading.local()
  33. # get the number of CPUs controlled by StarPU
  34. def cpu_count():
  35. n_cpus=starpupy.cpu_worker_get_count()
  36. return n_cpus
  37. # split a list ls into n_block numbers of sub-lists
  38. def partition(ls, n_block):
  39. if len(ls)>=n_block:
  40. # there are n1 sub-lists which contain q1 elements, and (n_block-n1) sublists which contain q2 elements (n1 can be 0)
  41. q1=math.ceil(len(ls)/n_block)
  42. q2=math.floor(len(ls)/n_block)
  43. n1=len(ls)%n_block
  44. #n2=n_block-n1
  45. # generate n1 sub-lists in L1, and (n_block-n1) sub-lists in L2
  46. L1=[ls[i:i+q1] for i in range(0, n1*q1, q1)]
  47. L2=[ls[i:i+q2] for i in range(n1*q1, len(ls), q2)]
  48. L=L1+L2
  49. else:
  50. # if the block number is larger than the length of list, each element in the list is a sub-list
  51. L=[ls[i:i+1] for i in range (len(ls))]
  52. return L
  53. # split a two-dimension numpy matrix into n_block numbers of sub-matrices
  54. def array2d_split(a, n_block):
  55. # decompose number of n_jobs to two integers multiply
  56. c_tmp=math.floor(math.sqrt(n_block))
  57. for i in range (c_tmp,0,-1):
  58. if n_block%i==0:
  59. c=i
  60. r=int(n_block/c)
  61. break
  62. # split column
  63. arr_split_c=np.array_split(a,c,0)
  64. arr_split=[]
  65. # split row
  66. for i in range(c):
  67. arr_split_r=np.array_split(arr_split_c[i],r,1)
  68. for j in range(r):
  69. arr_split.append(arr_split_r[j])
  70. return arr_split
  71. def future_generator(iterable, n_jobs, dict_task):
  72. # iterable is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
  73. #print("iterable type is ", type(iterable))
  74. #print("iterable is", iterable)
  75. # get the number of block
  76. if n_jobs<-cpu_count()-1 or n_jobs>cpu_count():
  77. raise SystemExit('Error: n_jobs is out of range')
  78. #print("Error: n_jobs is out of range, number of CPUs is", cpu_count())
  79. elif n_jobs<0:
  80. n_block=cpu_count()+1+n_jobs
  81. else:
  82. n_block=n_jobs
  83. # if arguments is tuple format
  84. if type(iterable) is tuple:
  85. # the function is always the first element
  86. f=iterable[0]
  87. # get the name of formal arguments of f
  88. formal_args=inspect.getargspec(f).args
  89. # get the arguments list
  90. args=[]
  91. # argument is arbitrary in iterable[1]
  92. args=list(iterable[1])
  93. # argument is keyword argument in iterable[2]
  94. for i in range(len(formal_args)):
  95. for j in iterable[2].keys():
  96. if j==formal_args[i]:
  97. args.append(iterable[2][j])
  98. # check whether all arrays have the same size
  99. l_arr=[]
  100. # list of Future result
  101. L_fut=[]
  102. # split the vector
  103. args_split=[]
  104. for i in range(len(args)):
  105. args_split.append([])
  106. # if the array is an numpy array
  107. if type(args[i]) is np.ndarray:
  108. # one-dimension matrix
  109. if args[i].ndim==1:
  110. # split numpy array
  111. args_split[i]=np.array_split(args[i],n_block)
  112. # get the length of numpy array
  113. l_arr.append(args[i].size)
  114. # two-dimension matrix
  115. elif args[i].ndim==2:
  116. # split numpy 2D array
  117. args_split[i]=array2d_split(args[i],n_block)
  118. # if the array is a generator
  119. elif isinstance(args[i],types.GeneratorType):
  120. # split generator
  121. args_split[i]=partition(list(args[i]),n_block)
  122. # get the length of generator
  123. l_arr.append(sum(len(args_split[i][j]) for j in range(len(args_split[i]))))
  124. if len(set(l_arr))>1:
  125. raise SystemExit('Error: all arrays should have the same size')
  126. #print("args list is", args_split)
  127. for i in range(n_block):
  128. # generate the argument list
  129. L_args=[]
  130. for j in range(len(args)):
  131. if type(args[j]) is np.ndarray or isinstance(args[j],types.GeneratorType):
  132. L_args.append(args_split[j][i])
  133. else:
  134. L_args.append(args[j])
  135. #print("L_args is", L_args)
  136. fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
  137. color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
  138. (f, *L_args)
  139. L_fut.append(fut)
  140. return L_fut
  141. # if iterable is a generator or a list of function
  142. else:
  143. L=list(iterable)
  144. #print(L)
  145. # generate a list of function according to iterable
  146. def lf(ls):
  147. L_func=[]
  148. for i in range(len(ls)):
  149. # the first element is the function
  150. f=ls[i][0]
  151. # the second element is the args list of a type tuple
  152. L_args=list(ls[i][1])
  153. # generate a list of function
  154. L_func.append(f(*L_args))
  155. return L_func
  156. # generate the split function list
  157. L_split=partition(L,n_block)
  158. # operation in each split list
  159. L_fut=[]
  160. for i in range(len(L_split)):
  161. fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
  162. color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
  163. (lf, L_split[i])
  164. L_fut.append(fut)
  165. return L_fut
  166. class Parallel(object):
  167. def __init__(self, mode="normal", perfmodel=None, end_msg=None,\
  168. name=None, synchronous=0, priority=0, color=None, flops=None,\
  169. n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
  170. batch_size='auto', temp_folder=None, max_nbytes='1M',\
  171. mmap_mode='r', prefer=None, require=None):
  172. #active_backend= get_active_backend()
  173. # nesting_level = active_backend.nesting_level
  174. # if backend is None:
  175. # backend = active_backend
  176. # else:
  177. # try:
  178. # backend_factory = BACKENDS[backend]
  179. # except KeyError as e:
  180. # raise ValueError("Invalid backend: %s, expected one of %r"
  181. # % (backend, sorted(BACKENDS.keys()))) from e
  182. # backend = backend_factory(nesting_level=nesting_level)
  183. if n_jobs is None:
  184. n_jobs = 1
  185. self.mode=mode
  186. self.perfmodel=perfmodel
  187. self.end_msg=end_msg
  188. self.name=name
  189. self.synchronous=synchronous
  190. self.priority=priority
  191. self.color=color
  192. self.flops=flops
  193. self.n_jobs=n_jobs
  194. self._backend=backend
  195. def print_progress(self):
  196. #pass
  197. print("", starpupy.task_nsubmitted())
  198. def __call__(self,iterable):
  199. #generate the dictionary of task_submit
  200. dict_task={'name': self.name, 'synchronous': self.synchronous, 'priority': self.priority, 'color': self.color, 'flops': self.flops, 'perfmodel': self.perfmodel}
  201. if hasattr(self._backend, 'start_call'):
  202. self._backend.start_call()
  203. # the mode normal, user can call the function directly without using async
  204. if self.mode=="normal":
  205. async def asy_main():
  206. L_fut=future_generator(iterable, self.n_jobs, dict_task)
  207. res=[]
  208. for i in range(len(L_fut)):
  209. L_res=await L_fut[i]
  210. res.extend(L_res)
  211. #print(res)
  212. #print("type of result is", type(res))
  213. return res
  214. #asyncio.run(asy_main())
  215. #retVal=asy_main
  216. loop = asyncio.get_event_loop()
  217. results = loop.run_until_complete(asy_main())
  218. retVal = results
  219. # the mode future, user needs to use asyncio module and await the Future result in main function
  220. elif self.mode=="future":
  221. L_fut=future_generator(iterable, self.n_jobs, dict_task)
  222. fut=asyncio.gather(*L_fut)
  223. if self.end_msg!=None:
  224. fut.add_done_callback(functools.partial(print, self.end_msg))
  225. retVal=fut
  226. if hasattr(self._backend, 'stop_call'):
  227. self._backend.stop_call()
  228. return retVal
  229. def delayed(function):
  230. def delayed_function(*args, **kwargs):
  231. return function, args, kwargs
  232. return delayed_function
  233. ######################################################################
  234. __version__ = jl.__version__
  235. class Memory(jl.Memory):
  236. def __init__(self,location=None, backend='local', cachedir=None,
  237. mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
  238. backend_options=None):
  239. super(Memory, self).__init__(location=None, backend='local', cachedir=None,
  240. mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
  241. backend_options=None)
  242. def dump(value, filename, compress=0, protocol=None, cache_size=None):
  243. return jl.dump(value, filename, compress, protocol, cache_size)
  244. def load(filename, mmap_mode=None):
  245. return jl.load(filename, mmap_mode)
  246. def hash(obj, hash_name='md5', coerce_mmap=False):
  247. return jl.hash(obj, hash_name, coerce_mmap)
  248. def register_compressor(compressor_name, compressor, force=False):
  249. return jl.register_compressor(compressor_name, compressor, force)
  250. def effective_n_jobs(n_jobs=-1):
  251. return cpu_count()
  252. def get_active_backend():
  253. backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
  254. if backend_and_jobs is not None:
  255. backend,n_jobs=backend_and_jobs
  256. return backend
  257. backend = BACKENDS[loky](nesting_level=0)
  258. return backend
  259. class parallel_backend(object):
  260. def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
  261. **backend_params):
  262. if isinstance(backend, str):
  263. backend = BACKENDS[backend](**backend_params)
  264. current_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
  265. if backend.nesting_level is None:
  266. if current_backend_and_jobs is None:
  267. nesting_level = 0
  268. else:
  269. nesting_level = current_backend_and_jobs[0].nesting_level
  270. backend.nesting_level = nesting_level
  271. # Save the backends info and set the active backend
  272. self.old_backend_and_jobs = current_backend_and_jobs
  273. self.new_backend_and_jobs = (backend, n_jobs)
  274. _backend.backend_and_jobs = (backend, n_jobs)
  275. def __enter__(self):
  276. return self.new_backend_and_jobs
  277. def __exit__(self, type, value, traceback):
  278. self.unregister()
  279. def unregister(self):
  280. if self.old_backend_and_jobs is None:
  281. if getattr(_backend, 'backend_and_jobs', None) is not None:
  282. del _backend.backend_and_jobs
  283. else:
  284. _backend.backend_and_jobs = self.old_backend_and_jobs
  285. def register_parallel_backend(name, factory):
  286. BACKENDS[name] = factory