| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- # StarPU --- Runtime system for heterogeneous multicore architectures.
- #
- # Copyright (C) 2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
- #
- # StarPU is free software; you can redistribute it and/or modify
- # it under the terms of the GNU Lesser General Public License as published by
- # the Free Software Foundation; either version 2.1 of the License, or (at
- # your option) any later version.
- #
- # StarPU is distributed in the hope that it will be useful, but
- # WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- #
- # See the GNU Lesser General Public License in COPYING.LGPL for more details.
- #
- from starpu import starpupy
- import asyncio
- import math
- import os
- import pickle
- import json
- import functools
- # get the number of CPUs controlled by StarPU
- n_cpus=starpupy.cpu_worker_get_count()
- #class perfmodel
- class Perfmodel(object):
- def __init__(self, symbol):
- self.symbol=symbol
- self.pstruct=starpupy.init_perfmodel(self.symbol)
- def get_struct(self):
- return self.pstruct
- def __del__(self):
- #def free_struct(self):
- starpupy.free_perfmodel(self.pstruct)
- # split a list ls into n_block numbers of sub-lists
- def partition(ls, n_block):
- if len(ls)>=n_block:
- # there are n1 sub-lists which contain q1 elements, and (n_block-n1) sublists which contain q2 elements (n1 can be 0)
- q1=math.ceil(len(ls)/n_block)
- q2=math.floor(len(ls)/n_block)
- n1=len(ls)%n_block
- #n2=n_block-n1
- # generate n1 sub-lists in L1, and (n_block-n1) sub-lists in L2
- L1=[ls[i:i+q1] for i in range(0, n1*q1, q1)]
- L2=[ls[i:i+q2] for i in range(n1*q1, len(ls), q2)]
- L=L1+L2
- else:
- # if the block number is larger than the length of list, each element in the list is a sub-list
- L=[ls[i:i+1] for i in range (len(ls))]
- return L
- # generate the dictionary which contains the perfmodel symbol and its struct pointer
- dict_perf={}
- def dict_perf_generator(perfsymbol):
- if dict_perf.get(perfsymbol)==None:
- p=Perfmodel(perfsymbol)
- dict_perf[perfsymbol]=p
- else:
- p=dict_perf[perfsymbol]
- return p
- def future_generator(g, n_jobs, perfsymbol):
- # g is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
- L=list(g)
- # generate a list of function according to g
- def lf(ls):
- L_func=[]
- for i in range(len(ls)):
- # the first element is the function
- f=ls[i][0]
- # the second element is the args list of a type tuple
- L_args=list(ls[i][1])
- # generate a list of function
- L_func.append(f(*L_args))
- return L_func
- # get the number of block
- if n_jobs<-n_cpus-1 or n_jobs>n_cpus:
- print("Error: n_jobs is out of range, number of CPUs is", n_cpus)
- elif n_jobs<0:
- n_block=n_cpus+1+n_jobs
- else:
- n_block=n_jobs
- # generate the split function list
- L_split=partition(L,n_block)
- # operation in each split list
- L_fut=[]
- for i in range(len(L_split)):
- if perfsymbol==None:
- fut=starpupy.task_submit(lf, L_split[i])
- L_fut.append(fut)
- else:
- p=dict_perf_generator(perfsymbol)
- fut=starpupy.task_submit(lf, L_split[i], p.get_struct())
- L_fut.append(fut)
- return L_fut
- def parallel(*, mode="normal", n_jobs=1, perfmodel=None, end_msg=None,\
- backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
- batch_size='auto', temp_folder=None, max_nbytes='1M',\
- mmap_mode='r', prefer=None, require=None):
- # the mode normal, user can call the function directly without using async
- if mode=="normal":
- def parallel_normal(g):
- async def asy_main():
- L_fut=future_generator(g, n_jobs, perfmodel)
- res=[]
- for i in range(len(L_fut)):
- L_res=await L_fut[i]
- res.extend(L_res)
- #print(res)
- return res
- asyncio.run(asy_main())
- return asy_main
- return parallel_normal
- # the mode future, user needs to use asyncio module and await the Future result in main function
- elif mode=="future":
- def parallel_future(g):
- L_fut=future_generator(g, n_jobs, perfmodel)
- fut=asyncio.gather(*L_fut)
- if end_msg==None:
- return fut
- else:
- fut.add_done_callback(functools.partial(print, end_msg))
- return fut
- #return fut
- return parallel_future
- def delayed(f):
- def delayed_func(*args):
- return f, args
- return delayed_func
- ######################################################################
- # dump performance model
- def perfmodel_plot(perfmodel):
- p=dict_perf[perfmodel]
- starpupy.save_history_based_model(p.get_struct())
- os.system('starpu_perfmodel_plot -s "' + perfmodel +'"')
- os.system('gnuplot starpu_'+perfmodel+'.gp')
- os.system('gv starpu_'+perfmodel+'.eps')
|