# StarPU --- Runtime system for heterogeneous multicore architectures. # # Copyright (C) 2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria # # StarPU is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or (at # your option) any later version. # # StarPU is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # # See the GNU Lesser General Public License in COPYING.LGPL for more details. # from starpu import starpupy import asyncio import math import os import pickle import json import functools # get the number of CPUs controlled by StarPU n_cpus=starpupy.cpu_worker_get_count() #class perfmodel class Perfmodel(object): def __init__(self, symbol): self.symbol=symbol self.pstruct=starpupy.init_perfmodel(self.symbol) def get_struct(self): return self.pstruct def __del__(self): #def free_struct(self): starpupy.free_perfmodel(self.pstruct) # split a list ls into n_block numbers of sub-lists def partition(ls, n_block): if len(ls)>=n_block: # there are n1 sub-lists which contain q1 elements, and (n_block-n1) sublists which contain q2 elements (n1 can be 0) q1=math.ceil(len(ls)/n_block) q2=math.floor(len(ls)/n_block) n1=len(ls)%n_block #n2=n_block-n1 # generate n1 sub-lists in L1, and (n_block-n1) sub-lists in L2 L1=[ls[i:i+q1] for i in range(0, n1*q1, q1)] L2=[ls[i:i+q2] for i in range(n1*q1, len(ls), q2)] L=L1+L2 else: # if the block number is larger than the length of list, each element in the list is a sub-list L=[ls[i:i+1] for i in range (len(ls))] return L # generate the dictionary which contains the perfmodel symbol and its struct pointer dict_perf={} def dict_perf_generator(perfsymbol): if dict_perf.get(perfsymbol)==None: p=Perfmodel(perfsymbol) dict_perf[perfsymbol]=p else: p=dict_perf[perfsymbol] return p def future_generator(g, n_jobs, perfsymbol): # g is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)] L=list(g) # generate a list of function according to g def lf(ls): L_func=[] for i in range(len(ls)): # the first element is the function f=ls[i][0] # the second element is the args list of a type tuple L_args=list(ls[i][1]) # generate a list of function L_func.append(f(*L_args)) return L_func # get the number of block if n_jobs<-n_cpus-1 or n_jobs>n_cpus: print("Error: n_jobs is out of range, number of CPUs is", n_cpus) elif n_jobs<0: n_block=n_cpus+1+n_jobs else: n_block=n_jobs # generate the split function list L_split=partition(L,n_block) # operation in each split list L_fut=[] for i in range(len(L_split)): if perfsymbol==None: fut=starpupy.task_submit(lf, L_split[i]) L_fut.append(fut) else: p=dict_perf_generator(perfsymbol) fut=starpupy.task_submit(lf, L_split[i], p.get_struct()) L_fut.append(fut) return L_fut def parallel(*, mode="normal", n_jobs=1, perfmodel=None, end_msg=None,\ backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\ batch_size='auto', temp_folder=None, max_nbytes='1M',\ mmap_mode='r', prefer=None, require=None): # the mode normal, user can call the function directly without using async if mode=="normal": def parallel_normal(g): async def asy_main(): L_fut=future_generator(g, n_jobs, perfmodel) res=[] for i in range(len(L_fut)): L_res=await L_fut[i] res.extend(L_res) #print(res) return res asyncio.run(asy_main()) return asy_main return parallel_normal # the mode future, user needs to use asyncio module and await the Future result in main function elif mode=="future": def parallel_future(g): L_fut=future_generator(g, n_jobs, perfmodel) fut=asyncio.gather(*L_fut) if end_msg==None: return fut else: fut.add_done_callback(functools.partial(print, end_msg)) return fut #return fut return parallel_future def delayed(f): def delayed_func(*args): return f, args return delayed_func ###################################################################### # dump performance model def perfmodel_plot(perfmodel): p=dict_perf[perfmodel] starpupy.save_history_based_model(p.get_struct()) os.system('starpu_perfmodel_plot -s "' + perfmodel +'"') os.system('gnuplot starpu_'+perfmodel+'.gp') os.system('gv starpu_'+perfmodel+'.eps')