123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package priorities
- import (
- "encoding/json"
- "fmt"
- "strings"
- _ "github.com/go-sql-driver/mysql"
- client "github.com/influxdata/influxdb1-client/v2"
- "k8s.io/klog"
- )
- var (
- nodeSelectionPriority = &CustomAllocationPriority{"NodeSelection", nodeSelectionScorer}
- //customResourcePriority = &CustomAllocationPriority{"CustomRequestedPriority", customResourceScorer}
- // LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources.
- // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and
- // prioritizes based on the minimum of the average of the fraction of requested to capacity.
- //
- // Details:
- // (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
- NodeSelectionPriorityMap = nodeSelectionPriority.PriorityMap
- )
- // type Config struct {
- // Server struct {
- // Port string `yaml:"port"`
- // Host string `yaml:"host"`
- // } `yaml:"server"`
- // Database struct {
- // Type string `yaml:"type"`
- // Name string `yaml:"name"`
- // Username string `yaml:"username"`
- // Password string `yaml:"password"`
- // } `yaml:"database"`
- // MonitoringSpecs struct {
- // TimeInterval float32 `yaml:"interval"`
- // } `yaml:"monitoring"`
- // }
- // type Row struct {
- // ipc float32
- // l3m float32
- // reads float32
- // writes float32
- // c6res float32
- // }
- // type System struct {
- // ID int `json:"id"`
- // Uuid string `json:"uuid"`
- // numSockets int `json:"num_sockets"`
- // numCores int `json:"num_cores`
- // }
- // var nodes = map[string]string{
- // "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- // "kube-02": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- // "kube-03": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- // "kube-04": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- // "kube-05": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- // "kube-06": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- // "kube-07": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- // "kube-08": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- // }
- // var sockets = map[string]int{
- // "kube-01": 0,
- // "kube-02": 0,
- // "kube-03": 0,
- // "kube-04": 0,
- // "kube-05": 0,
- // "kube-06": 1,
- // "kube-07": 0,
- // "kube-08": 1,
- // }
- var cores = map[string][]int{
- "kube-01": []int{},
- "kube-02": []int{},
- "kube-03": []int{},
- "kube-04": []int{},
- "kube-05": []int{0, 1, 2, 3},
- "kube-06": []int{12, 13, 14, 15, 16, 17, 18, 19},
- "kube-07": []int{4, 5, 6, 7, 8, 9, 10, 11, 24, 25, 26, 27, 28, 29, 30, 31},
- "kube-08": []int{20, 21, 22, 23, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47},
- }
- // func readFile(cfg *Config, file string) error {
- // f, err := os.Open(file)
- // if err != nil {
- // klog.Infof("Config file for scheduler not found. Error: %v", err)
- // return err
- // }
- // defer f.Close()
- // decoder := yaml.NewDecoder(f)
- // err = decoder.Decode(&cfg)
- // if err != nil {
- // klog.Infof("Unable to decode the config file. Error: %v", err)
- // return err
- // }
- // return nil
- // }
- type scorerInput struct {
- metricName string
- metrics map[string]float64
- }
- func OneScorer(si scorerInput) float64 {
- return si.metrics[si.metricName]
- }
- // func customScoreFn(metrics map[string]float64) float64 {
- // return metrics["ipc"] / metrics["mem_read"] * metrics["mem_write"]
- // }
- // func onlyIPC(metrics map[string]float64) float64 {
- // return metrics["ipc"]
- // }
- // func onlyL3(metrics map[string]float64) float64 {
- // return 1 / metrics["l3m"]
- // }
- // func onlyNrg(metrics map[string]float64) float64 {
- // return 1 / metrics["procnrg"]
- // }
- // func calculateScore(results map[string]float64,
- // logicFn func(map[string]float64) float64) float64 {
- // res := logicFn(results)
- // //klog.Infof("Has score (in float) %v\n", res)
- // return res
- // }
- func calculateWeightedAverageCores(response *client.Response,
- numberOfRows, numberOfMetrics, numberOfCores int) (map[string]float64, error) {
- // initialize the metrics map with a constant size
- metrics := make(map[string]float64, numberOfMetrics)
- rows := response.Results[0].Series[0]
- for i := 1; i < len(rows.Columns); i++ {
- klog.Infof("Name of column %v : %v\n range of values: %v", i, rows.Columns[i], len(rows.Values))
- for j := 0; j < numberOfRows; j++ {
- avg := 0.0
- for k := 0; k < numberOfCores; k++ {
- val, err := rows.Values[j*numberOfCores+k][i].(json.Number).Float64()
- if err != nil {
- klog.Infof("Error while calculating %v", rows.Columns[i])
- return nil, err
- }
- //metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
- avg += val
- }
- metrics[rows.Columns[i]] += avg * float64(numberOfRows-j)
- }
- metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
- klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
- }
- // TODO better handling for the returning errors
- return metrics, nil
- }
- // func connectToInfluxDB(cfg Config) (client.Client, error) {
- // c, err := client.NewHTTPClient(client.HTTPConfig{
- // Addr: "http://" + cfg.Server.Host + ":" + cfg.Server.Port + "",
- // })
- // if err != nil {
- // klog.Infof("Error while connecting to InfluxDB: %v ", err.Error())
- // return nil, err
- // }
- // klog.Infof("Connected Successfully to InfluxDB")
- // return c, nil
- // }
- // This function does the following:
- // 1. Queries the DB with the provided metrics and cores
- // 2. Calculates and returns the weighted average of each of those metrics
- func queryInfluxDbCores(metrics []string, uuid string, socket,
- time int, cfg Config, c client.Client, cores []int) (map[string]float64, error) {
- // calculate the number of rows needed
- // i.e. 20sec / 0.5s interval => 40rows
- numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
- // EDIT
- // This time we will fetch data for multiple cores
- // so we will need more rows, proportional to the core number
- numberOfRows *= len(cores)
- // merge all the required columns
- columns := strings.Join(metrics, ", ")
- // build the cores part of the command
- var coresPart strings.Builder
- fmt.Fprintf(&coresPart, "core_id='%d'", cores[0])
- for i := 1; i < len(cores); i++ {
- fmt.Fprintf(&coresPart, " or core_id='%d'", cores[i])
- }
- // build the coommand
- var command strings.Builder
- fmt.Fprintf(&command, "SELECT %s from core_metrics where uuid = '%s' and socket_id='%d' and %s order by time desc limit %d", columns, uuid, socket, coresPart.String(), numberOfRows)
- klog.Infof("The query is: %v", command.String())
- q := client.NewQuery(command.String(), cfg.Database.Name, "")
- response, err := c.Query(q)
- if err != nil {
- klog.Infof("Error while executing the query: %v", err.Error())
- return nil, err
- }
- // Calculate the average for the metrics provided
- return calculateWeightedAverageCores(response, numberOfRows, len(metrics), len(cores))
- }
- func nodeSelectionScorer(nodeName string) (float64, error) {
- //return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
- //customRequestedScore(requested.Memory, allocable.Memory)) / 2
- //read database information
- var cfg Config
- err := readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
- if err != nil {
- return 0, err
- }
- /*-------------------------------------
- //TODO read also nodes to uuid mappings for EVOLVE
- -------------------------------------*/
- // InfluxDB
- c, err := connectToInfluxDB(cfg)
- if err != nil {
- return 0, err
- }
- // close the connection in the end of execution
- defer c.Close()
- //Get the uuid of this node in order to query in the database
- curr_uuid, ok := nodes[nodeName]
- socket, _ := sockets[nodeName]
- cores, _ := cores[nodeName]
- if len(cores) == 0 {
- return 0.0, nil
- }
- klog.Infof("Node %v has %v cores", nodeName, len(cores))
- if ok {
- // Select Socket
- results, err := queryInfluxDbCores([]string{"c6res"}, curr_uuid, socket, 20, cfg, c, cores)
- if err != nil {
- klog.Infof("Error in querying or calculating average: %v", err.Error())
- return 0, nil
- }
- res := calculateScore(results, OneScorer)
- // Select Node
- klog.Infof("Node name %s, has score %v\n", nodeName, res)
- return res, nil
- } else {
- klog.Infof("Error finding the uuid: %v", ok)
- return 0, nil
- }
- }
|