123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- # StarPU --- Runtime system for heterogeneous multicore architectures.
- #
- # Copyright (C) 2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
- #
- # StarPU is free software; you can redistribute it and/or modify
- # it under the terms of the GNU Lesser General Public License as published by
- # the Free Software Foundation; either version 2.1 of the License, or (at
- # your option) any later version.
- #
- # StarPU is distributed in the hope that it will be useful, but
- # WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- #
- # See the GNU Lesser General Public License in COPYING.LGPL for more details.
- #
- import sys
- try:
- sys.path.remove('/usr/local/lib/python3.8/site-packages/starpu')
- except:
- pass
- import types
- import joblib as jl
- from joblib import logger
- from starpu import starpupy
- import starpu
- import asyncio
- import math
- import functools
- import numpy as np
- import inspect
- import threading
- BACKENDS={}
- _backend = threading.local()
- # get the number of CPUs controlled by StarPU
- def cpu_count():
- n_cpus=starpupy.cpu_worker_get_count()
- return n_cpus
- # split a list ls into n_block numbers of sub-lists
- def partition(ls, n_block):
- if len(ls)>=n_block:
- # there are n1 sub-lists which contain q1 elements, and (n_block-n1) sublists which contain q2 elements (n1 can be 0)
- q1=math.ceil(len(ls)/n_block)
- q2=math.floor(len(ls)/n_block)
- n1=len(ls)%n_block
- #n2=n_block-n1
- # generate n1 sub-lists in L1, and (n_block-n1) sub-lists in L2
- L1=[ls[i:i+q1] for i in range(0, n1*q1, q1)]
- L2=[ls[i:i+q2] for i in range(n1*q1, len(ls), q2)]
- L=L1+L2
- else:
- # if the block number is larger than the length of list, each element in the list is a sub-list
- L=[ls[i:i+1] for i in range (len(ls))]
- return L
- def future_generator(iterable, n_jobs, dict_task):
- # iterable is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
- #print("iterable type is ", type(iterable))
- #print("iterable is", iterable)
- # get the number of block
- if n_jobs<-cpu_count()-1 or n_jobs>cpu_count():
- raise SystemExit('Error: n_jobs is out of range')
- #print("Error: n_jobs is out of range, number of CPUs is", cpu_count())
- elif n_jobs<0:
- n_block=cpu_count()+1+n_jobs
- else:
- n_block=n_jobs
- # if arguments is tuple format
- if type(iterable) is tuple:
- # the function is always the first element
- f=iterable[0]
- # get the name of formal arguments of f
- formal_args=inspect.getargspec(f).args
- # get the arguments list
- args=[]
- # argument is arbitrary in iterable[1]
- args=list(iterable[1])
- # argument is keyword argument in iterable[2]
- for i in range(len(formal_args)):
- for j in iterable[2].keys():
- if j==formal_args[i]:
- args.append(iterable[2][j])
- # check whether all arrays have the same size
- l_arr=[]
- # list of Future result
- L_fut=[]
- # split the vector
- args_split=[]
- for i in range(len(args)):
- args_split.append([])
- # if the array is an numpy array
- if type(args[i]) is np.ndarray:
- # split numpy array
- args_split[i]=np.array_split(args[i],n_block)
- # get the length of numpy array
- l_arr.append(args[i].size)
- # if the array is a generator
- elif isinstance(args[i],types.GeneratorType):
- # split generator
- args_split[i]=partition(list(args[i]),n_block)
- # get the length of generator
- l_arr.append(sum(len(args_split[i][j]) for j in range(len(args_split[i]))))
- if len(set(l_arr))>1:
- raise SystemExit('Error: all arrays should have the same size')
- #print("args list is", args_split)
- for i in range(n_block):
- # generate the argument list
- L_args=[]
- for j in range(len(args)):
- if type(args[j]) is np.ndarray or isinstance(args[j],types.GeneratorType):
- L_args.append(args_split[j][i])
- else:
- L_args.append(args[j])
- #print("L_args is", L_args)
- fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
- color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
- (f, *L_args)
- L_fut.append(fut)
- return L_fut
- # if iterable is a generator or a list of function
- else:
- L=list(iterable)
- #print(L)
- # generate a list of function according to iterable
- def lf(ls):
- L_func=[]
- for i in range(len(ls)):
- # the first element is the function
- f=ls[i][0]
- # the second element is the args list of a type tuple
- L_args=list(ls[i][1])
- # generate a list of function
- L_func.append(f(*L_args))
- return L_func
- # generate the split function list
- L_split=partition(L,n_block)
- # operation in each split list
- L_fut=[]
- for i in range(len(L_split)):
- fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
- color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
- (lf, L_split[i])
- L_fut.append(fut)
- return L_fut
- class Parallel(object):
- def __init__(self, mode="normal", perfmodel=None, end_msg=None,\
- name=None, synchronous=0, priority=0, color=None, flops=None,\
- n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
- batch_size='auto', temp_folder=None, max_nbytes='1M',\
- mmap_mode='r', prefer=None, require=None):
- active_backend, context_n_jobs = get_active_backend(prefer=prefer, require=require, verbose=verbose)
- nesting_level = active_backend.nesting_level
- if backend is None:
- backend = active_backend
- else:
- try:
- backend_factory = BACKENDS[backend]
- except KeyError as e:
- raise ValueError("Invalid backend: %s, expected one of %r"
- % (backend, sorted(BACKENDS.keys()))) from e
- backend = backend_factory(nesting_level=nesting_level)
- if n_jobs is None:
- n_jobs = 1
- self.mode=mode
- self.perfmodel=perfmodel
- self.end_msg=end_msg
- self.name=name
- self.synchronous=synchronous
- self.priority=priority
- self.color=color
- self.flops=flops
- self.n_jobs=n_jobs
- self._backend=backend
- def print_progress(self):
- #pass
- print("", starpupy.task_nsubmitted())
- def __call__(self,iterable):
- #generate the dictionary of task_submit
- dict_task={'name': self.name, 'synchronous': self.synchronous, 'priority': self.priority, 'color': self.color, 'flops': self.flops, 'perfmodel': self.perfmodel}
- if hasattr(self._backend, 'start_call'):
- self._backend.start_call()
- # the mode normal, user can call the function directly without using async
- if self.mode=="normal":
- async def asy_main():
- L_fut=future_generator(iterable, self.n_jobs, dict_task)
- res=[]
- for i in range(len(L_fut)):
- L_res=await L_fut[i]
- res.extend(L_res)
- #print(res)
- #print("type of result is", type(res))
- return res
- #asyncio.run(asy_main())
- #retVal=asy_main
- loop = asyncio.get_event_loop()
- results = loop.run_until_complete(asy_main())
- retVal = results
- # the mode future, user needs to use asyncio module and await the Future result in main function
- elif self.mode=="future":
- L_fut=future_generator(iterable, self.n_jobs, dict_task)
- fut=asyncio.gather(*L_fut)
- if self.end_msg!=None:
- fut.add_done_callback(functools.partial(print, self.end_msg))
- retVal=fut
- if hasattr(self._backend, 'stop_call'):
- self._backend.stop_call()
- return retVal
- def delayed(function):
- def delayed_function(*args, **kwargs):
- return function, args, kwargs
- return delayed_function
- ######################################################################
- __version__ = jl.__version__
- class Memory(jl.Memory):
- def __init__(self,location=None, backend='local', cachedir=None,
- mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
- backend_options=None):
- super(Memory, self).__init__(location=None, backend='local', cachedir=None,
- mmap_mode=None, compress=False, verbose=1, bytes_limit=None,
- backend_options=None)
- def dump(value, filename, compress=0, protocol=None, cache_size=None):
- return jl.dump(value, filename, compress, protocol, cache_size)
- def load(filename, mmap_mode=None):
- return jl.load(filename, mmap_mode)
- def hash(obj, hash_name='md5', coerce_mmap=False):
- return jl.hash(obj, hash_name, coerce_mmap)
- def register_compressor(compressor_name, compressor, force=False):
- return jl.register_compressor(compressor_name, compressor, force)
- def effective_n_jobs(n_jobs=-1):
- return cpu_count()
- def get_active_backend(prefer=None, require=None, verbose=0):
- return jl.parallel.get_active_backend(prefer, require, verbose)
- class parallel_backend(object):
- def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
- **backend_params):
- if isinstance(backend, str):
- backend = BACKENDS[backend](**backend_params)
- current_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
- if backend.nesting_level is None:
- if current_backend_and_jobs is None:
- nesting_level = 0
- else:
- nesting_level = current_backend_and_jobs[0].nesting_level
- backend.nesting_level = nesting_level
- # Save the backends info and set the active backend
- self.old_backend_and_jobs = current_backend_and_jobs
- self.new_backend_and_jobs = (backend, n_jobs)
- _backend.backend_and_jobs = (backend, n_jobs)
- def __enter__(self):
- return self.new_backend_and_jobs
- def __exit__(self, type, value, traceback):
- self.unregister()
- def unregister(self):
- if self.old_backend_and_jobs is None:
- if getattr(_backend, 'backend_and_jobs', None) is not None:
- del _backend.backend_and_jobs
- else:
- _backend.backend_and_jobs = self.old_backend_and_jobs
- def register_parallel_backend(name, factory):
- BACKENDS[name] = factory
|