/* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package priorities import ( "encoding/json" "fmt" "os" "strings" _ "github.com/go-sql-driver/mysql" client "github.com/influxdata/influxdb1-client/v2" "gopkg.in/yaml.v2" "k8s.io/klog" ) var ( customResourcePriority = &CustomAllocationPriority{"CustomResourceAllocation", customResourceScorer} // LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources. // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and // prioritizes based on the minimum of the average of the fraction of requested to capacity. // // Details: // (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2 CustomRequestedPriorityMap = customResourcePriority.PriorityMap ) type Config struct { Server struct { Port string `yaml:"port"` Host string `yaml:"host"` } `yaml:"server"` Database struct { Type string `yaml: "type"` Name string `yaml:"name"` Username string `yaml:"username"` Password string `yaml:"password"` } `yaml:"database"` MonitoringSpecs struct { TimeInterval float32 `yaml: "interval"` } `yaml: "monitoring"` } // type Row struct { // ipc float32 // l3m float32 // reads float32 // writes float32 // c6res float32 // } type System struct { ID int `json:"id"` Uuid string `json:"uuid"` numSockets int `json:"num_sockets"` numCores int `json:"num_cores` } var nodes = map[string]string{ "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002", "kube-02": "c4766d29-4dc1-11ea-9d98-0242ac110002", "kube-03": "c4766d29-4dc1-11ea-9d98-0242ac110002", "kube-04": "c4766d29-4dc1-11ea-9d98-0242ac110002", "kube-05": "c4766d29-4dc1-11ea-9d98-0242ac110002", "kube-06": "c4766d29-4dc1-11ea-9d98-0242ac110002", "kube-07": "c4766d29-4dc1-11ea-9d98-0242ac110002", "kube-08": "c4766d29-4dc1-11ea-9d98-0242ac110002", } var sockets = map[string]int{ "kube-01": 0, "kube-02": 0, "kube-03": 0, "kube-04": 0, "kube-05": 0, "kube-06": 1, "kube-07": 0, "kube-08": 1, } func readFile(cfg *Config, file string) { f, err := os.Open("/etc/kubernetes/scheduler-monitoringDB.yaml") if err != nil { panic(err.Error()) } defer f.Close() decoder := yaml.NewDecoder(f) err = decoder.Decode(&cfg) if err != nil { panic(err.Error()) } } func customScoreFn(metrics map[string]float64) float64 { return metrics["reads"] * metrics["writes"] / metrics["ipc"] } func calculateScore(results map[string]float64, logicFn func(map[string]float64) float64) int64 { res := logicFn(results) // TODO // While the final score should be an integer, // find a solution about resolving the float produced return int64(res) } func calculateWeightedAverage(response *client.Response, numberOfRows int, numberOfMetrics int) (map[string]float64, error) { // initialize the metrics map with a constant size metrics := make(map[string]float64, numberOfMetrics) rows := response.Results[0].Series[0] for i := 1; i < len(rows.Columns); i++ { for j := 0; j < numberOfRows; j++ { val, err := rows.Values[j][i].(json.Number).Float64() if err != nil { klog.Infof("Error while calculating %v", rows.Columns[i]) return nil, err } metrics[rows.Columns[i]] += val * float64(numberOfRows-j) } metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2)) } // TODO better handling for the returning errors return metrics, nil } func connectToInfluxDB(cfg Config) (client.Client, error) { c, err := client.NewHTTPClient(client.HTTPConfig{ Addr: "http://" + cfg.Server.Host + ":" + cfg.Server.Port + "", }) if err != nil { klog.Infof("Error while connecting to InfluxDB: %v ", err.Error()) return nil, err } klog.Infof("Connected Successfully to InfluxDB") return c, nil } func queryInfluxDB(metrics []string, uuid string, time int, cfg Config, c client.Client) (map[string]float64, error) { // calculate the number of rows needed // i.e. 20sec / 0.5s interval => 40rows numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval) // merge all the required columns columns := strings.Join(metrics, ", ") // build the coommand command := fmt.Sprintf("SELECT %s from system_metrics where uuid = '%s' order by time desc limit %d", columns, uuid, numberOfRows) q := client.NewQuery(command, cfg.Database.Name, "") response, err := c.Query(q) if err != nil { klog.Infof("Error while executing the query: %v", err.Error()) return nil, err } // Calculate the average for the metrics provided return calculateWeightedAverage(response, numberOfRows, len(metrics)) } func customResourceScorer(nodeName string) (int64, error) { //return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) + //customRequestedScore(requested.Memory, allocable.Memory)) / 2 //read database information var cfg Config readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml") /*------------------------------------- //TODO read also nodes to uuid mappings -------------------------------------*/ // InfluxDB c, err := connectToInfluxDB(cfg) if err != nil { return 0, err } // close the connection in the end of execution defer c.Close() //Get the uuid of this node in order to query in the database curr_uuid, ok := nodes[nodeName] if ok { results, err := queryInfluxDB([]string{"ipc", "l3m", "c6res"}, curr_uuid, 20, cfg, c) if err != nil { klog.Infof("Error in querying or calculating average: %v", err.Error()) return 0, nil } res := calculateScore(results, customScoreFn) klog.Infof("Node name %s, has score %d\n", nodeName, res) return res, nil } else { klog.Infof("Error finding the uuid: %v", ok) } // //Close the database connection in the end of the execution // defer db.Close() // //Get the uuid of this node in order to query in the database // curr_uuid, ok := nodes[nodeName] // //Get the metrics for the current node // if ok { // results, err := db.Query("SELECT id, num_sockets, num_cores FROM systems WHERE uuid = ?", curr_uuid) // if err != nil { // panic(err.Error()) // proper error handling instead of panic in your app // } // sys := System{} // sys.Uuid = curr_uuid // for results.Next() { // // for each row, scan the result into our tag composite object // err = results.Scan(&sys.ID, &sys.numSockets, &sys.numCores) // if err != nil { // panic(err.Error()) // proper error handling instead of panic in your app // } // // and then print out the tag's Name attribute // klog.Infof("This is the system with name: %s, id: %d and number of cores: %d", nodeName, sys.ID, sys.numCores) // } // } res := customRequestedScore(nodeName) klog.Infof("Node name %s, has score %d\n", nodeName, res) return res, nil } func customRequestedScore(nodeName string) int64 { if nodeName == "kube-01" { return 10 } return 0 }