Przeglądaj źródła

added functionality in scoring

Achilleas Tzenetopoulos 5 lat temu
rodzic
commit
a6202bec19

+ 103 - 22
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/custom_resource_allocation.go

@@ -20,6 +20,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"os"
+	"strings"
 
 	_ "github.com/go-sql-driver/mysql"
 	client "github.com/influxdata/influxdb1-client/v2"
@@ -50,8 +51,19 @@ type Config struct {
 		Username string `yaml:"username"`
 		Password string `yaml:"password"`
 	} `yaml:"database"`
+	MonitoringSpecs struct {
+		TimeInterval float32 `yaml: "interval"`
+	} `yaml: "monitoring"`
 }
 
+// type Row struct {
+// 	ipc    float32
+// 	l3m    float32
+// 	reads  float32
+// 	writes float32
+// 	c6res  float32
+// }
+
 type System struct {
 	ID         int    `json:"id"`
 	Uuid       string `json:"uuid"`
@@ -70,6 +82,17 @@ var nodes = map[string]string{
 	"kube-08": "c4766d29-4dc1-11ea-9d98-0242ac110002",
 }
 
+var sockets = map[string]int{
+	"kube-01": 0,
+	"kube-02": 0,
+	"kube-03": 0,
+	"kube-04": 0,
+	"kube-05": 0,
+	"kube-06": 1,
+	"kube-07": 0,
+	"kube-08": 1,
+}
+
 func readFile(cfg *Config, file string) {
 	f, err := os.Open("/etc/kubernetes/scheduler-monitoringDB.yaml")
 	if err != nil {
@@ -84,7 +107,75 @@ func readFile(cfg *Config, file string) {
 	}
 }
 
-func customResourceScorer(nodeName string) int64 {
+func customScoreFn(metrics map[string]float64) float64 {
+	return metrics["reads"] * metrics["writes"] / metrics["ipc"]
+}
+
+func calculateScore(results map[string]float64,
+	logicFn func(map[string]float64) float64) int64 {
+
+	res := logicFn(results)
+
+	// TODO
+	// While the final score should be an integer,
+	// find a solution about resolving the float produced
+	return int64(res)
+}
+
+func calculateWeightedAverage(response *client.Response,
+	numberOfRows int, numberOfMetrics int) (map[string]float64, error) {
+	// initialize the metrics map with a constant size
+	metrics := make(map[string]float64, numberOfMetrics)
+	rows := response.Results[0].Series[0]
+	for i := 1; i < len(rows.Columns); i++ {
+		for j := 0; j < numberOfRows; j++ {
+			val, err := rows.Values[j][i].(json.Number).Float64()
+			if err != nil {
+				klog.Infof("Error while calculating %v", rows.Columns[i])
+				return nil, err
+			}
+			metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
+		}
+		metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
+	}
+	// TODO better handling for the returning errors
+	return metrics, nil
+}
+
+func connectToInfluxDB(cfg Config) (client.Client, error) {
+	c, err := client.NewHTTPClient(client.HTTPConfig{
+		Addr: "http://" + cfg.Server.Host + ":" + cfg.Server.Port + "",
+	})
+	if err != nil {
+		klog.Infof("Error while connecting to InfluxDB: %v ", err.Error())
+		return nil, err
+	}
+	klog.Infof("Connected Successfully to InfluxDB")
+	return c, nil
+
+}
+
+func queryInfluxDB(metrics []string, uuid string,
+	time int, cfg Config, c client.Client) (map[string]float64, error) {
+
+	// calculate the number of rows needed
+	// i.e. 20sec / 0.5s interval => 40rows
+	numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
+	// merge all the required columns
+	columns := strings.Join(metrics, ", ")
+	// build the coommand
+	command := fmt.Sprintf("SELECT %s from system_metrics where uuid = '%s' order by time desc limit %d", columns, uuid, numberOfRows)
+	q := client.NewQuery(command, cfg.Database.Name, "")
+	response, err := c.Query(q)
+	if err != nil {
+		klog.Infof("Error while executing the query: %v", err.Error())
+		return nil, err
+	}
+	// Calculate the average for the metrics provided
+	return calculateWeightedAverage(response, numberOfRows, len(metrics))
+}
+
+func customResourceScorer(nodeName string) (int64, error) {
 	//return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
 	//customRequestedScore(requested.Memory, allocable.Memory)) / 2
 
@@ -95,37 +186,26 @@ func customResourceScorer(nodeName string) int64 {
 	//TODO read also nodes to uuid mappings
 	-------------------------------------*/
 
-	//Access the Database
-	// DBstring := cfg.Database.Username + ":" + cfg.Database.Password + "@tcp(" + cfg.Server.Host + ":" + cfg.Server.Port + ")/" + cfg.Database.Name
-	// _, err := sql.Open(cfg.Database.Type, DBstring)
-	// if err != nil {
-	// 	panic(err.Error())
-	// }
-
 	// InfluxDB
-	c, err := client.NewHTTPClient(client.HTTPConfig{
-		Addr: "http://" + cfg.Server.Host + ":" + cfg.Server.Port + "",
-	})
+	c, err := connectToInfluxDB(cfg)
 	if err != nil {
-		klog.Infof("Error while creating InfluxDB client: %v ", err.Error())
-	} else {
-		klog.Infof("Connected Successfully to InfluxDB")
+		return 0, err
 	}
 	// close the connection in the end of execution
-
 	defer c.Close()
 
 	//Get the uuid of this node in order to query in the database
 	curr_uuid, ok := nodes[nodeName]
 
 	if ok {
-		command := fmt.Sprintf("SELECT ipc from system_metrics where uuid = '%s' order by time desc limit 1", curr_uuid)
-		q := client.NewQuery(command, "evolve", "")
-		if response, err := c.Query(q); err == nil && response.Error() == nil {
-			klog.Infof("%v", response.Results[0].Series[0].Values[0][1].(json.Number))
-		} else if err != nil {
-			klog.Infof("Error: %v", err)
+		results, err := queryInfluxDB([]string{"ipc", "l3m", "c6res"}, curr_uuid, 20, cfg, c)
+		if err != nil {
+			klog.Infof("Error in querying or calculating average: %v", err.Error())
+			return 0, nil
 		}
+		res := calculateScore(results, customScoreFn)
+		klog.Infof("Node name %s, has score %d\n", nodeName, res)
+		return res, nil
 	} else {
 		klog.Infof("Error finding the uuid: %v", ok)
 	}
@@ -156,9 +236,10 @@ func customResourceScorer(nodeName string) int64 {
 	// 		klog.Infof("This is the system with name: %s, id: %d and  number of cores: %d", nodeName, sys.ID, sys.numCores)
 	// 	}
 	// }
+
 	res := customRequestedScore(nodeName)
 	klog.Infof("Node name %s, has score %d\n", nodeName, res)
-	return res
+	return res, nil
 }
 
 func customRequestedScore(nodeName string) int64 {

+ 2 - 2
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/resource_allocation.go

@@ -36,7 +36,7 @@ type ResourceAllocationPriority struct {
 
 type CustomAllocationPriority struct {
 	Name   string
-	scorer func(nodeName string) int64
+	scorer func(nodeName string) (int64, error)
 }
 
 // PriorityMap priorities nodes according to the resource allocations on the node.
@@ -124,7 +124,7 @@ func (r *CustomAllocationPriority) PriorityMap(
 	// 	score = r.scorer(&requested, &allocatable, false, 0, 0)
 	// }
 
-	score = r.scorer(node.Name)
+	score, _ = r.scorer(node.Name)
 
 	// if klog.V(10) {
 	// 	if len(pod.Spec.Volumes) >= 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {