123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 |
- package priorities
- import (
- "encoding/json"
- "fmt"
- "strings"
- _ "github.com/go-sql-driver/mysql"
- client "github.com/influxdata/influxdb1-client/v2"
- "github.com/iwita/kube-scheduler/customcache"
- "k8s.io/klog"
- )
- var (
- customResourcePriority = &CustomAllocationPriority{"CustomResourceAllocation", customResourceScorer}
-
-
-
-
-
-
-
- CustomRequestedPriorityMap = customResourcePriority.PriorityMap
- )
- func customScoreFn(si scorerInput) float64 {
-
- return 1 / (si.metrics["mem_read"] + si.metrics["mem_write"])
- }
- func onlyIPC(metrics map[string]float64) float64 {
- return metrics["ipc"]
- }
- func onlyL3(metrics map[string]float64) float64 {
- return 1 / metrics["l3m"]
- }
- func onlyNrg(metrics map[string]float64) float64 {
- return 1 / metrics["procnrg"]
- }
- func calculateScore(si scorerInput,
- logicFn func(scorerInput) float64) float64 {
- res := logicFn(si)
-
- return res
- }
- func calculateWeightedAverage(response *client.Response,
- numberOfRows, numberOfMetrics int) (map[string]float64, error) {
-
- metrics := make(map[string]float64, numberOfMetrics)
- rows := response.Results[0].Series[0]
- for i := 1; i < len(rows.Columns); i++ {
- for j := 0; j < numberOfRows; j++ {
- val, err := rows.Values[j][i].(json.Number).Float64()
- if err != nil {
- klog.Infof("Error while calculating %v", rows.Columns[i])
- return nil, err
- }
- metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
- }
- metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
-
- }
-
- return metrics, nil
- }
- func customScoreInfluxDB(metrics []string, uuid string, socket,
- numberOfRows int, cfg Config, c client.Client) (map[string]float64, error) {
-
-
-
-
- columns := strings.Join(metrics, ", ")
-
- var command strings.Builder
- fmt.Fprintf(&command, "SELECT %s from socket_metrics where uuid = '%s' and socket_id='%d' order by time desc limit %d", columns, uuid, socket, numberOfRows)
-
-
- q := client.NewQuery(command.String(), cfg.Database.Name, "")
- response, err := c.Query(q)
- if err != nil {
- klog.Infof("Error while executing the query: %v", err.Error())
- return nil, err
- }
-
- return calculateWeightedAverage(response, numberOfRows, len(metrics))
- }
- func InvalidateCache() {
-
- select {
-
- case <-customcache.LabCache.Timeout.C:
- klog.Infof("Time to erase")
- klog.Infof("Cache: %v", customcache.LabCache.Cache)
-
- customcache.LabCache.CleanCache()
- default:
- }
- }
- func customResourceScorer(nodeName string) (float64, error) {
-
-
- cores, _ := Cores[nodeName]
- var results map[string]float64
-
- customcache.LabCache.Mux.Lock()
- ipc, ok := customcache.LabCache.Cache[nodeName]["ipc"]
- if !ok {
- klog.Infof("IPC is nil")
- }
- reads, ok := customcache.LabCache.Cache[nodeName]["mem_read"]
- if !ok {
- klog.Infof("Memory Reads is nil")
- }
- writes, ok := customcache.LabCache.Cache[nodeName]["mem_write"]
- if !ok {
- klog.Infof("Memory Writes is nil")
- }
- c6res, ok := customcache.LabCache.Cache[nodeName]["c6res"]
- if !ok {
- klog.Infof("C6 state is nil")
- }
- customcache.LabCache.Mux.Unlock()
-
- if ipc != -1 && reads != -1 && writes != -1 && c6res != -1 {
- results := map[string]float64{
- "ipc": ipc,
- "mem_read": reads,
- "mem_write": writes,
- "c6res": c6res,
- }
- klog.Infof("Found in the cache: ipc: %v, reads: %v, writes: %v", ipc, reads, writes)
- res := calculateScore(scorerInput{metrics: results}, customScoreFn)
- if sum := c6res * float64(len(cores)); sum < 1 {
-
- res = res * c6res
- } else {
- res = res * 1
- }
-
- speed := links[Nodes[nodeName]][0] * links[Nodes[nodeName]][1]
- maxFreq := maxSpeed[Nodes[nodeName]]
- res = res * float64(speed) * float64(maxFreq)
-
- klog.Infof("Using the cached values, Node name %s, has score %v\n", nodeName, res)
- return res, nil
- }
-
- var cfg Config
- err := readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
- if err != nil {
- return 0, err
- }
-
-
- c, err := connectToInfluxDB(cfg)
- if err != nil {
- return 0, err
- }
-
- defer c.Close()
-
- curr_uuid, ok := Nodes[nodeName]
- socket, _ := Sockets[nodeName]
-
- var socketNodes []string
- if ok {
- metrics := []string{"c6res"}
- time := 20
- numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
-
-
- for kubenode, s := range Sockets {
- if s == socket && Nodes[kubenode] == curr_uuid {
- socketNodes = append(socketNodes, kubenode)
- }
- }
- sum := 0
- socketSum := 0.0
- socketCores := 0.0
- for _, snode := range socketNodes {
- currCores, _ := Cores[snode]
- r, err := queryInfluxDbCores(metrics, curr_uuid, socket, numberOfRows, cfg, c, currCores)
-
- if err != nil {
- klog.Infof("Error in querying or calculating core availability in the first stage: %v", err.Error())
- }
- average, err := calculateWeightedAverageCores(r, numberOfRows, len(metrics), len(currCores))
- if err != nil {
- klog.Infof("Error defining core availability")
- }
- if average["c6res"]*float64(len(currCores)) >= 1 {
- klog.Infof("Node %v has C6 sum: %v", snode, average["c6res"]*float64(len(currCores)))
- sum++
- }
- socketSum += average["c6res"] * float64(len(currCores))
- socketCores += float64(len(currCores))
- }
-
- results, err = customScoreInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, numberOfRows, cfg, c)
- if err != nil {
- klog.Infof("Error in querying or calculating average for the custom score in the first stage: %v", err.Error())
- return 0, nil
- }
- res := calculateScore(scorerInput{metrics: results}, customScoreFn)
-
- if sum < 1 {
- klog.Infof("Less than 1 node is available\nC6contribution: %v", socketSum/socketCores)
- res = res * socketSum / socketCores
- } else {
- res = res * 1
- }
-
- err = customcache.LabCache.UpdateCache(results, socketSum/socketCores, nodeName)
- if err != nil {
- klog.Infof(err.Error())
- } else {
- klog.Infof("Cache updated successfully for %v", nodeName)
- }
-
- speed := links[Nodes[nodeName]][0] * links[Nodes[nodeName]][1]
- maxFreq := maxSpeed[Nodes[nodeName]]
- res = res * float64(speed) * float64(maxFreq)
-
- klog.Infof("Node name %s, has score %v\n", nodeName, res)
- return res, nil
- } else {
- klog.Infof("Error finding the uuid: %v", ok)
- return 0, nil
- }
- }
|