| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- package priorities
- import (
- "encoding/json"
- "fmt"
- "strings"
- _ "github.com/go-sql-driver/mysql"
- client "github.com/influxdata/influxdb1-client/v2"
- "k8s.io/klog"
- )
- var (
- nodeSelectionPriority = &CustomAllocationPriority{"NodeSelection", nodeSelectionScorer}
-
-
-
-
-
-
-
- NodeSelectionPriorityMap = nodeSelectionPriority.PriorityMap
- )
- func OneScorer(si scorerInput) float64 {
- return si.metrics[si.metricName]
- }
- func calculateWeightedAverageCores(response *client.Response,
- numberOfRows, numberOfMetrics, numberOfCores int) (map[string]float64, error) {
-
- metrics := make(map[string]float64, numberOfMetrics)
- rows := response.Results[0].Series[0]
- for i := 1; i < len(rows.Columns); i++ {
-
- for j := 0; j < numberOfRows; j++ {
- avg := 0.0
- for k := 0; k < numberOfCores; k++ {
- val, err := rows.Values[j*numberOfCores+k][i].(json.Number).Float64()
- if err != nil {
- klog.Infof("Error while calculating %v", rows.Columns[i])
- return nil, err
- }
-
- avg += val / float64(numberOfCores)
- }
- metrics[rows.Columns[i]] += avg * float64(numberOfRows-j)
- }
- metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
-
- }
-
- return metrics, nil
- }
- func queryInfluxDbCores(metrics []string, uuid string, socket,
- numberOfRows int, cfg Config, c client.Client, cores []int) (*client.Response, error) {
-
-
-
-
-
-
-
- columns := strings.Join(metrics, ", ")
-
- var coresPart strings.Builder
- fmt.Fprintf(&coresPart, "core_id='%d'", cores[0])
- for i := 1; i < len(cores); i++ {
- fmt.Fprintf(&coresPart, " or core_id='%d'", cores[i])
- }
-
- var command strings.Builder
- fmt.Fprintf(&command, "SELECT %s from core_metrics where uuid = '%s' and socket_id='%d' and %s order by time desc limit %d", columns, uuid, socket, coresPart.String(), numberOfRows*len(cores))
-
- q := client.NewQuery(command.String(), cfg.Database.Name, "")
- response, err := c.Query(q)
- if err != nil {
- klog.Infof("Error while executing the query: %v", err.Error())
- return nil, err
- }
-
- return response, nil
- }
- func nodeSelectionScorer(nodeName string) (float64, error) {
-
-
-
- var cfg Config
- err := readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
- if err != nil {
- return 0, err
- }
-
-
- c, err := connectToInfluxDB(cfg)
- if err != nil {
- return 0, err
- }
-
- defer c.Close()
-
- curr_uuid, ok := Nodes[nodeName]
- socket, _ := Sockets[nodeName]
- cores, _ := Cores[nodeName]
- if len(cores) == 0 {
- return 0.0, nil
- }
-
- if ok {
- metrics := []string{"c6res"}
- time := 20
- numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
-
- r, err := queryInfluxDbCores(metrics, curr_uuid, socket, numberOfRows, cfg, c, cores)
- if err != nil {
- klog.Infof("Error in querying or calculating average: %v", err.Error())
- return 0, nil
- }
- results, err := calculateWeightedAverageCores(r, numberOfRows, len(metrics), len(cores))
- res := calculateScore(scorerInput{metricName: "c6res", metrics: results}, OneScorer) * float64(len(cores))
-
- klog.Infof("Node name %s, Score %v\n", nodeName, res)
- return res, nil
- } else {
- klog.Infof("Error finding the uuid: %v", ok)
- return 0, nil
- }
- }
|