/* Copyright 2020 Achilleas Tzenetopoulos Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ /* Extension of the scheduler developed during my diploma thesis Make it pluggable with Kubernetes logic in scheduler extension Date started: 30/1/2020 Using the scheduling framework and plugins logic */ package noderesources import ( "context" "database/sql" "fmt" "os" _ "github.com/go-sql-driver/mysql" "gopkg.in/yaml.v2" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) type Config struct { Server struct { Port string `yaml:"port"` Host string `yaml:"host"` } `yaml:"server"` Database struct { Type string `yaml: "type"` Name string `yaml:"name"` Username string `yaml:"username"` Password string `yaml:"password"` } `yaml:"database"` } // Custom allocation is a score plugin that favors nodes with a custom scoring function taking into account custom resources. type CustomAllocated struct { handle framework.FrameworkHandle resourceAllocationScorer //customResourceAllocationScorer } type nodeMetrics struct { L3cacheMisses int64 memoryReads int64 memoryWrites int64 averageIPC float32 } type System struct { ID int `json:"id"` Uuid string `json:"uuid"` numSockets int `json:"num_sockets"` numCores int `json:"num_cores` } type wrongValueError struct{} var _ = framework.ScorePlugin(&CustomAllocated{}) //Map nodes to uuid //var nodes = make(map[string]string) var nodes = map[string]string{ "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002", "kube-02": "2", "kube-03": "2", "kube-04": "2", } // CustomAllocatedName is the name of the plugin used in the plugin registry and configurations. const CustomAllocatedName = "NodeResourcesCustomAllocated" // Name returns name of the plugin. It is used in logs, etc. func (ca *CustomAllocated) Name() string { return CustomAllocatedName } func readFile(cfg *Config) { f, err := os.Open("/etc/kubernetes/scheduler-monitoringDB.yaml") if err != nil { panic(err.Error()) } defer f.Close() decoder := yaml.NewDecoder(f) err = decoder.Decode(&cfg) if err != nil { panic(err.Error()) } } func getCustomMetrics(name string) *nodeMetrics { var cfg Config readFile(&cfg) //Open the database connection DBstring := cfg.Database.Username + ":" + cfg.Database.Password + "@tcp(" + cfg.Server.Host + ":" + cfg.Server.Port + ")/" + cfg.Database.Name db, err := sql.Open(cfg.Database.Type, DBstring) if err != nil { panic(err.Error()) } defer db.Close() curr_uuid, ok := nodes[name] nd := nodeMetrics{} if ok { results, err := db.Query("SELECT id, num_sockets, num_cores FROM systems WHERE uuid = ?", curr_uuid) if err != nil { panic(err.Error()) // proper error handling instead of panic in your app } sys := System{} sys.Uuid = curr_uuid for results.Next() { // for each row, scan the result into our tag composite object err = results.Scan(&sys.ID, &sys.numSockets, &sys.numCores) if err != nil { panic(err.Error()) // proper error handling instead of panic in your app } // and then print out the tag's Name attribute klog.Infof("This is the system with name: %s, id: %d and number of cores: %d", name, sys.ID, sys.numCores) } } //nd := nodeMetrics{} if name == "kube-01" { nd.L3cacheMisses = 1 nd.memoryReads = 1 nd.memoryWrites = 1 nd.averageIPC = 2.0 } else { //some sample values for testing purposes nd.L3cacheMisses = 200 nd.memoryReads = 50 nd.memoryWrites = 60 nd.averageIPC = 1.2 } //query metrics for this node provided in the input if nd.L3cacheMisses < 0 || nd.memoryReads < 0 || nd.memoryWrites < 0 || nd.averageIPC < 0 { //err := errors.New("Failed to get the values") return nil } return &nd } func (ca *CustomAllocated) scoringFunction(pod *v1.Pod, node *nodeMetrics, nodeInfo *nodeinfo.NodeInfo) int64 { //rawScore := float32(node.memoryReads+node.memoryWrites) / node.averageIPC intScore := int64(float32(node.memoryReads+node.memoryWrites) / node.averageIPC) //Sample implementation in order to check functionality klog.Infof("Score: %d", intScore) return intScore } // Score invoked at the score extension point. func (ca *CustomAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { nodeInfo, err := ca.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) if err != nil { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } //Search into the monitored metrics for the specified node nodeMetrics := getCustomMetrics(nodeName) // ca.score favors nodes with 'better' monitored resources score. // It calculates the percentage of micro-architecture metrics like L3 cache misses and/or IPC and memory bandwidth, and // prioritizes based on the lowest score provided by a custom scoring function. // // Details: // TODO klog.Infof("Node: %s", nodeName) return ca.scoringFunction(pod, nodeMetrics, nodeInfo), nil //return ca.score(pod, nodeInfo) } // ScoreExtensions of the Score plugin. func (ca *CustomAllocated) ScoreExtensions() framework.ScoreExtensions { return nil } // NewCustomAllocated initializes a new plugin and returns it. func NewCustomAllocated(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) { return &CustomAllocated{ handle: h, resourceAllocationScorer: resourceAllocationScorer{ CustomAllocatedName, customResourceScorer, defaultRequestedRatioResources, }, }, nil } func customResourceScorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 { //There I should call a function which will get my custom metrics var nodeScore, weightSum int64 for resource, weight := range defaultRequestedRatioResources { resourceScore := customRequestedScore(requested[resource], allocable[resource]) nodeScore += resourceScore * weight weightSum += weight } return nodeScore / weightSum } // The unused capacity is calculated on a scale of 0-10 // 0 being the lowest priority and 10 being the highest. // The more unused resources the higher the score is. func customRequestedScore(requested, capacity int64) int64 { if capacity == 0 { return 0 } if requested > capacity { return 0 } return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity }