123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- package priorities
- import (
- "encoding/json"
- "fmt"
- "os"
- "strings"
- _ "github.com/go-sql-driver/mysql"
- client "github.com/influxdata/influxdb1-client/v2"
- "gopkg.in/yaml.v2"
- "k8s.io/klog"
- )
- var (
- customResourcePriority = &CustomAllocationPriority{"CustomResourceAllocation", customResourceScorer}
-
-
-
-
-
-
- CustomRequestedPriorityMap = customResourcePriority.PriorityMap
- )
- type Config struct {
- Server struct {
- Port string `yaml:"port"`
- Host string `yaml:"host"`
- } `yaml:"server"`
- Database struct {
- Type string `yaml:"type"`
- Name string `yaml:"name"`
- Username string `yaml:"username"`
- Password string `yaml:"password"`
- } `yaml:"database"`
- MonitoringSpecs struct {
- TimeInterval float32 `yaml:"interval"`
- } `yaml:"monitoring"`
- }
- var nodes = map[string]string{
- "kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- "kube-02": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- "kube-03": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- "kube-04": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- "kube-05": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- "kube-06": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- "kube-07": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- "kube-08": "c4766d29-4dc1-11ea-9d98-0242ac110002",
- }
- var sockets = map[string]int{
- "kube-01": 0,
- "kube-02": 0,
- "kube-03": 0,
- "kube-04": 0,
- "kube-05": 0,
- "kube-06": 1,
- "kube-07": 0,
- "kube-08": 1,
- }
- func readFile(cfg *Config, file string) {
- f, err := os.Open("/etc/kubernetes/scheduler-monitoringDB.yaml")
- if err != nil {
- panic(err.Error())
- }
- defer f.Close()
- decoder := yaml.NewDecoder(f)
- err = decoder.Decode(&cfg)
- if err != nil {
- panic(err.Error())
- }
- }
- func customScoreFn(metrics map[string]float64) float64 {
- return metrics["reads"] * metrics["writes"] / metrics["ipc"]
- }
- func calculateScore(results map[string]float64,
- logicFn func(map[string]float64) float64) int64 {
- res := logicFn(results)
- klog.Infof("Has score (in float) %v\n", res)
-
-
-
- return int64(res)
- }
- func calculateWeightedAverage(response *client.Response,
- numberOfRows int, numberOfMetrics int) (map[string]float64, error) {
-
- metrics := make(map[string]float64, numberOfMetrics)
- rows := response.Results[0].Series[0]
- for i := 1; i < len(rows.Columns); i++ {
- for j := 0; j < numberOfRows; j++ {
- val, err := rows.Values[j][i].(json.Number).Float64()
- if err != nil {
- klog.Infof("Error while calculating %v", rows.Columns[i])
- return nil, err
- }
- metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
- }
- metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
- klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
- }
-
- return metrics, nil
- }
- func connectToInfluxDB(cfg Config) (client.Client, error) {
- c, err := client.NewHTTPClient(client.HTTPConfig{
- Addr: "http://" + cfg.Server.Host + ":" + cfg.Server.Port + "",
- })
- if err != nil {
- klog.Infof("Error while connecting to InfluxDB: %v ", err.Error())
- return nil, err
- }
- klog.Infof("Connected Successfully to InfluxDB")
- return c, nil
- }
- func queryInfluxDB(metrics []string, uuid string, socket int,
- time int, cfg Config, c client.Client) (map[string]float64, error) {
-
-
- numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
-
- columns := strings.Join(metrics, ", ")
-
- command := fmt.Sprintf("SELECT %s from socket_metrics where uuid = '%s' and socket_id='%d' order by time desc limit %d", columns, uuid, socket, numberOfRows)
- q := client.NewQuery(command, cfg.Database.Name, "")
- response, err := c.Query(q)
- if err != nil {
- klog.Infof("Error while executing the query: %v", err.Error())
- return nil, err
- }
-
- return calculateWeightedAverage(response, numberOfRows, len(metrics))
- }
- func customResourceScorer(nodeName string) (int64, error) {
-
-
-
- var cfg Config
- readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
-
-
- c, err := connectToInfluxDB(cfg)
- if err != nil {
- return 0, err
- }
-
- defer c.Close()
-
- curr_uuid, ok := nodes[nodeName]
- socket, _ := sockets[nodeName]
- if ok {
- results, err := queryInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, 20, cfg, c)
- if err != nil {
- klog.Infof("Error in querying or calculating average: %v", err.Error())
- return 0, nil
- }
- res := calculateScore(results, customScoreFn)
- klog.Infof("Node name %s, has score %d\n", nodeName, res)
- return res, nil
- } else {
- klog.Infof("Error finding the uuid: %v", ok)
- return 0, nil
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- }
- func customRequestedScore(nodeName string) int64 {
- if nodeName == "kube-01" {
- return 10
- }
- return 0
- }
|