Explorar o código

added core availability features

Achilleas Tzenetopoulos %!s(int64=5) %!d(string=hai) anos
pai
achega
fdf66341ef

+ 62 - 5
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/custom_resource_allocation.go

@@ -84,12 +84,12 @@ func calculateWeightedAverage(response *client.Response,
 	return metrics, nil
 }
 
-func queryInfluxDB(metrics []string, uuid string, socket,
-	time int, cfg Config, c client.Client) (map[string]float64, error) {
+func customScoreInfluxDB(metrics []string, uuid string, socket,
+	numberOfRows int, cfg Config, c client.Client) (map[string]float64, error) {
 
 	// calculate the number of rows needed
 	// i.e. 20sec / 0.5s interval => 40rows
-	numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
+	//numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
 	// merge all the required columns
 	columns := strings.Join(metrics, ", ")
 	// build the coommand
@@ -131,17 +131,36 @@ func customResourceScorer(nodeName string) (float64, error) {
 	//Get the uuid of this node in order to query in the database
 	curr_uuid, ok := nodes[nodeName]
 	socket, _ := sockets[nodeName]
+	cores, _ := cores[nodeName]
 
 	if ok {
 
+		metrics := []string{"c6res"}
+		time := 20
+
+		numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
+
+		// Define Core availability
+		r, err := queryInfluxDbCores(metrics, curr_uuid, socket, numberOfRows, cfg, c, cores)
+		if err != nil {
+			klog.Infof("Error in querying or calculating core availability in the first stage: %v", err.Error())
+		}
+		average, err := calculateWeightedAverageCores(r, numberOfRows, len(metrics), len(cores))
+		if err != nil {
+			klog.Infof("Error defining core availability")
+		}
+
 		// Select Socket
-		results, err := queryInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, 20, cfg, c)
+		results, err := customScoreInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, numberOfRows, cfg, c)
 		if err != nil {
-			klog.Infof("Error in querying or calculating average: %v", err.Error())
+			klog.Infof("Error in querying or calculating average for the custom score in the first stage: %v", err.Error())
 			return 0, nil
 		}
 		res := calculateScore(scorerInput{metrics: results}, customScoreFn)
 
+		if sum := average["c6res"] * float64(len(cores)); sum > 1 {
+			res = res * sum
+		}
 		// Select Node
 
 		klog.Infof("Node name %s, has score %v\n", nodeName, res)
@@ -150,4 +169,42 @@ func customResourceScorer(nodeName string) (float64, error) {
 		klog.Infof("Error finding the uuid: %v", ok)
 		return 0, nil
 	}
+
 }
+
+// WARNING
+// c6res is not a dependable metric for isnpecting core availability
+// Some Systems use higher core states (e.g c7res)
+// func findAvailability(response *client.Response, numberOfMetrics, numberOfRows, numberOfCores int, floor float64) (map[string]float64, error) {
+// 	// initialize the metrics map with a constant size
+// 	metrics := make(map[string]float64, numberOfMetrics)
+// 	rows := response.Results[0].Series[0]
+// 	for i := 1; i < len(rows.Columns); i++ {
+// 		//klog.Infof("Name of column %v : %v\nrange of values: %v\nnumber of rows: %v\nnumber of cores %v\n", i, rows.Columns[i], len(rows.Values), numberOfRows, numberOfCores)
+// 		for j := 0; j < numberOfRows; j++ {
+// 			//avg, max := 0.0, 0.0
+// 			for k := 0; k < numberOfCores; k++ {
+// 				val, err := rows.Values[j*numberOfCores+k][i].(json.Number).Float64()
+// 				if err != nil {
+// 					klog.Infof("Error while calculating %v", rows.Columns[i])
+// 					return false, err
+// 				}
+// 				// if val > floor {
+// 				// 	return true, nil
+// 				// }
+// 				// sum += val
+
+// 				//avg += val / float64(numberOfCores)
+// 				avg += val
+// 			}
+// 			metrics[rows.Columns[i]] += avg * float64(numberOfRows-j)
+// 		}
+// 		metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
+// 		if metrics[row.Columns[i]] > 1 {
+// 			return true, nil
+// 		}
+// 		//klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
+// 	}
+// 	// TODO better handling for the returning errors
+// 	return false, nil
+// }

+ 11 - 5
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/node_selection.go

@@ -73,11 +73,11 @@ func calculateWeightedAverageCores(response *client.Response,
 // 1. Queries the DB with the provided metrics and cores
 // 2. Calculates and returns the weighted average of each of those metrics
 func queryInfluxDbCores(metrics []string, uuid string, socket,
-	time int, cfg Config, c client.Client, cores []int) (map[string]float64, error) {
+	numberOfRows int, cfg Config, c client.Client, cores []int) (*client.Response, error) {
 
 	// calculate the number of rows needed
 	// i.e. 20sec / 0.2s interval => 100rows
-	numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
+	//numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
 	// EDIT
 	// This time we will fetch data for multiple cores
 	// so we will need more rows, proportional to the core number
@@ -102,7 +102,7 @@ func queryInfluxDbCores(metrics []string, uuid string, socket,
 		return nil, err
 	}
 	// Calculate the average for the metrics provided
-	return calculateWeightedAverageCores(response, numberOfRows, len(metrics), len(cores))
+	return response, nil
 }
 
 func nodeSelectionScorer(nodeName string) (float64, error) {
@@ -139,14 +139,20 @@ func nodeSelectionScorer(nodeName string) (float64, error) {
 	//klog.Infof("Node %v has %v cores", nodeName, len(cores))
 	if ok {
 
+		metrics := []string{"c6res"}
+		time := 20
+
+		numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
 		// Select Socket
-		results, err := queryInfluxDbCores([]string{"c6res"}, curr_uuid, socket, 20, cfg, c, cores)
+		r, err := queryInfluxDbCores(metrics, curr_uuid, socket, numberOfRows, cfg, c, cores)
 		if err != nil {
 			klog.Infof("Error in querying or calculating average: %v", err.Error())
 			return 0, nil
 		}
 
-		res := calculateScore(scorerInput{metricName: "c6res", metrics: results}, OneScorer)
+		results, err := calculateWeightedAverageCores(r, numberOfRows, len(metrics), len(cores))
+
+		res := calculateScore(scorerInput{metricName: "c6res", metrics: results}, OneScorer) * float64(len(cores))
 
 		// Select Node