123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- /*
- Copyright 2020 Achilleas Tzenetopoulos
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- /*
- Extension of the scheduler developed during my diploma thesis
- Make it pluggable with Kubernetes logic in scheduler extension
- Date started: 30/1/2020
- Using the scheduling framework and plugins logic
- */
- package noderesources
- import (
- "context"
- "database/sql"
- "fmt"
- "os"
- _ "github.com/go-sql-driver/mysql"
- "gopkg.in/yaml.v2"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/klog"
- framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
- "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- )
- type Config struct {
- Server struct {
- Port string `yaml:"port"`
- Host string `yaml:"host"`
- } `yaml:"server"`
- Database struct {
- Type string `yaml: "type"`
- Name string `yaml:"name"`
- Username string `yaml:"username"`
- Password string `yaml:"password"`
- } `yaml:"database"`
- }
- // Custom allocation is a score plugin that favors nodes with a custom scoring function taking into account custom resources.
- type CustomAllocated struct {
- handle framework.FrameworkHandle
- resourceAllocationScorer
- //customResourceAllocationScorer
- }
- type nodeMetrics struct {
- L3cacheMisses int64
- memoryReads int64
- memoryWrites int64
- averageIPC float32
- }
- type System struct {
- ID int `json:"id"`
- Uuid string `json:"uuid"`
- numSockets int `json:"num_sockets"`
- numCores int `json:"num_cores`
- }
- type wrongValueError struct{}
- var _ = framework.ScorePlugin(&CustomAllocated{})
- //Map nodes to uuid
- //var nodes = make(map[string]string)
- var nodes = map[string]string{
- "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- "kube-02": "2",
- "kube-03": "2",
- "kube-04": "2",
- }
- // CustomAllocatedName is the name of the plugin used in the plugin registry and configurations.
- const CustomAllocatedName = "NodeResourcesCustomAllocated"
- // Name returns name of the plugin. It is used in logs, etc.
- func (ca *CustomAllocated) Name() string {
- return CustomAllocatedName
- }
- func readFile(cfg *Config) {
- f, err := os.Open("/etc/kubernetes/scheduler-monitoringDB.yaml")
- if err != nil {
- panic(err.Error())
- }
- defer f.Close()
- decoder := yaml.NewDecoder(f)
- err = decoder.Decode(&cfg)
- if err != nil {
- panic(err.Error())
- }
- }
- func getCustomMetrics(name string) *nodeMetrics {
- var cfg Config
- readFile(&cfg)
- //Open the database connection
- DBstring := cfg.Database.Username + ":" + cfg.Database.Password + "@tcp(" + cfg.Server.Host + ":" + cfg.Server.Port + ")/" + cfg.Database.Name
- db, err := sql.Open(cfg.Database.Type, DBstring)
- if err != nil {
- panic(err.Error())
- }
- defer db.Close()
- curr_uuid, ok := nodes[name]
- nd := nodeMetrics{}
- if ok {
- results, err := db.Query("SELECT id, num_sockets, num_cores FROM systems WHERE uuid = ?", curr_uuid)
- if err != nil {
- panic(err.Error()) // proper error handling instead of panic in your app
- }
- sys := System{}
- sys.Uuid = curr_uuid
- for results.Next() {
- // for each row, scan the result into our tag composite object
- err = results.Scan(&sys.ID, &sys.numSockets, &sys.numCores)
- if err != nil {
- panic(err.Error()) // proper error handling instead of panic in your app
- }
- // and then print out the tag's Name attribute
- klog.Infof("This is the system with name: %s, id: %d and number of cores: %d", name, sys.ID, sys.numCores)
- }
- }
- //nd := nodeMetrics{}
- if name == "kube-01" {
- nd.L3cacheMisses = 1
- nd.memoryReads = 1
- nd.memoryWrites = 1
- nd.averageIPC = 2.0
- } else {
- //some sample values for testing purposes
- nd.L3cacheMisses = 200
- nd.memoryReads = 50
- nd.memoryWrites = 60
- nd.averageIPC = 1.2
- }
- //query metrics for this node provided in the input
- if nd.L3cacheMisses < 0 || nd.memoryReads < 0 || nd.memoryWrites < 0 || nd.averageIPC < 0 {
- //err := errors.New("Failed to get the values")
- return nil
- }
- return &nd
- }
- func (ca *CustomAllocated) scoringFunction(pod *v1.Pod, node *nodeMetrics, nodeInfo *nodeinfo.NodeInfo) int64 {
- //rawScore := float32(node.memoryReads+node.memoryWrites) / node.averageIPC
- intScore := int64(float32(node.memoryReads+node.memoryWrites) / node.averageIPC)
- //Sample implementation in order to check functionality
- klog.Infof("Score: %d", intScore)
- return intScore
- }
- // Score invoked at the score extension point.
- func (ca *CustomAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
- nodeInfo, err := ca.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
- if err != nil {
- return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
- }
- //Search into the monitored metrics for the specified node
- nodeMetrics := getCustomMetrics(nodeName)
- // ca.score favors nodes with 'better' monitored resources score.
- // It calculates the percentage of micro-architecture metrics like L3 cache misses and/or IPC and memory bandwidth, and
- // prioritizes based on the lowest score provided by a custom scoring function.
- //
- // Details:
- // TODO
- klog.Infof("Node: %s", nodeName)
- return ca.scoringFunction(pod, nodeMetrics, nodeInfo), nil
- //return ca.score(pod, nodeInfo)
- }
- // ScoreExtensions of the Score plugin.
- func (ca *CustomAllocated) ScoreExtensions() framework.ScoreExtensions {
- return nil
- }
- // NewCustomAllocated initializes a new plugin and returns it.
- func NewCustomAllocated(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
- return &CustomAllocated{
- handle: h,
- resourceAllocationScorer: resourceAllocationScorer{
- CustomAllocatedName,
- customResourceScorer,
- defaultRequestedRatioResources,
- },
- }, nil
- }
- func customResourceScorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
- //There I should call a function which will get my custom metrics
- var nodeScore, weightSum int64
- for resource, weight := range defaultRequestedRatioResources {
- resourceScore := customRequestedScore(requested[resource], allocable[resource])
- nodeScore += resourceScore * weight
- weightSum += weight
- }
- return nodeScore / weightSum
- }
- // The unused capacity is calculated on a scale of 0-10
- // 0 being the lowest priority and 10 being the highest.
- // The more unused resources the higher the score is.
- func customRequestedScore(requested, capacity int64) int64 {
- if capacity == 0 {
- return 0
- }
- if requested > capacity {
- return 0
- }
- return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity
- }
|