Browse Source

added second priority function for cores

Achilleas Tzenetopoulos 5 years ago
parent
commit
1568f117da

+ 6 - 4
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/custom_resource_allocation.go

@@ -71,6 +71,8 @@ type Config struct {
 // 	numCores   int    `json:"num_cores`
 // }
 
+// TODO:
+// place those maps in another file inside the package
 var nodes = map[string]string{
 	"kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
 	"kube-02": "c4766d29-4dc1-11ea-9d98-0242ac110002",
@@ -110,8 +112,8 @@ func readFile(cfg *Config, file string) error {
 	return nil
 }
 
-func customScoreFn(metrics map[string]float64) float64 {
-	return metrics["ipc"] / metrics["mem_read"] * metrics["mem_write"]
+func customScoreFn(si scorerInput) float64 {
+	return si.metrics["ipc"] / si.metrics["mem_read"] * si.metrics["mem_write"]
 }
 
 func onlyIPC(metrics map[string]float64) float64 {
@@ -127,9 +129,9 @@ func onlyNrg(metrics map[string]float64) float64 {
 }
 
 func calculateScore(results map[string]float64,
-	logicFn func(map[string]float64) float64) float64 {
+	logicFn func(scorerInput) float64) float64 {
 
-	res := logicFn(results)
+	res := logicFn(scorerInput{metrics: results})
 	//klog.Infof("Has score (in float) %v\n", res)
 
 	return res

+ 277 - 0
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/node_selection.go

@@ -0,0 +1,277 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package priorities
+
+import (
+	"encoding/json"
+	"fmt"
+	"strings"
+
+	_ "github.com/go-sql-driver/mysql"
+	client "github.com/influxdata/influxdb1-client/v2"
+	"k8s.io/klog"
+)
+
+var (
+	nodeSelectionPriority = &CustomAllocationPriority{"NodeSelection", nodeSelectionScorer}
+	//customResourcePriority = &CustomAllocationPriority{"CustomRequestedPriority", customResourceScorer}
+	// LeastRequestedPriorityMap is a priority function that favors nodes with fewer requested resources.
+	// It calculates the percentage of memory and CPU requested by pods scheduled on the node, and
+	// prioritizes based on the minimum of the average of the fraction of requested to capacity.
+	//
+	// Details:
+	// (cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
+	NodeSelectionPriorityMap = nodeSelectionPriority.PriorityMap
+)
+
+// type Config struct {
+// 	Server struct {
+// 		Port string `yaml:"port"`
+// 		Host string `yaml:"host"`
+// 	} `yaml:"server"`
+// 	Database struct {
+// 		Type     string `yaml:"type"`
+// 		Name     string `yaml:"name"`
+// 		Username string `yaml:"username"`
+// 		Password string `yaml:"password"`
+// 	} `yaml:"database"`
+// 	MonitoringSpecs struct {
+// 		TimeInterval float32 `yaml:"interval"`
+// 	} `yaml:"monitoring"`
+// }
+
+// type Row struct {
+// 	ipc    float32
+// 	l3m    float32
+// 	reads  float32
+// 	writes float32
+// 	c6res  float32
+// }
+
+// type System struct {
+// 	ID         int    `json:"id"`
+// 	Uuid       string `json:"uuid"`
+// 	numSockets int    `json:"num_sockets"`
+// 	numCores   int    `json:"num_cores`
+// }
+
+// var nodes = map[string]string{
+// 	"kube-01": "c4766d29-4dc1-11ea-9d98-0242ac110002",
+// 	"kube-02": "c4766d29-4dc1-11ea-9d98-0242ac110002",
+// 	"kube-03": "c4766d29-4dc1-11ea-9d98-0242ac110002",
+// 	"kube-04": "c4766d29-4dc1-11ea-9d98-0242ac110002",
+// 	"kube-05": "c4766d29-4dc1-11ea-9d98-0242ac110002",
+// 	"kube-06": "c4766d29-4dc1-11ea-9d98-0242ac110002",
+// 	"kube-07": "c4766d29-4dc1-11ea-9d98-0242ac110002",
+// 	"kube-08": "c4766d29-4dc1-11ea-9d98-0242ac110002",
+// }
+
+// var sockets = map[string]int{
+// 	"kube-01": 0,
+// 	"kube-02": 0,
+// 	"kube-03": 0,
+// 	"kube-04": 0,
+// 	"kube-05": 0,
+// 	"kube-06": 1,
+// 	"kube-07": 0,
+// 	"kube-08": 1,
+// }
+
+var cores = map[string][]int{
+	"kube-01": []int{},
+	"kube-02": []int{},
+	"kube-03": []int{},
+	"kube-04": []int{},
+	"kube-05": []int{0, 1, 2, 3},
+	"kube-06": []int{12, 13, 14, 15, 16, 17, 18, 19},
+	"kube-07": []int{4, 5, 6, 7, 8, 9, 10, 11, 24, 25, 26, 27, 28, 29, 30, 31},
+	"kube-08": []int{20, 21, 22, 23, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47},
+}
+
+// func readFile(cfg *Config, file string) error {
+// 	f, err := os.Open(file)
+// 	if err != nil {
+// 		klog.Infof("Config file for scheduler not found. Error: %v", err)
+// 		return err
+// 	}
+// 	defer f.Close()
+
+// 	decoder := yaml.NewDecoder(f)
+// 	err = decoder.Decode(&cfg)
+// 	if err != nil {
+// 		klog.Infof("Unable to decode the config file. Error: %v", err)
+// 		return err
+// 	}
+// 	return nil
+// }
+
+type scorerInput struct {
+	metricName string
+	metrics    map[string]float64
+}
+
+func OneScorer(si scorerInput) float64 {
+	return si.metrics[si.metricName]
+}
+
+// func customScoreFn(metrics map[string]float64) float64 {
+// 	return metrics["ipc"] / metrics["mem_read"] * metrics["mem_write"]
+// }
+
+// func onlyIPC(metrics map[string]float64) float64 {
+// 	return metrics["ipc"]
+// }
+
+// func onlyL3(metrics map[string]float64) float64 {
+// 	return 1 / metrics["l3m"]
+// }
+
+// func onlyNrg(metrics map[string]float64) float64 {
+// 	return 1 / metrics["procnrg"]
+// }
+
+// func calculateScore(results map[string]float64,
+// 	logicFn func(map[string]float64) float64) float64 {
+
+// 	res := logicFn(results)
+// 	//klog.Infof("Has score (in float) %v\n", res)
+
+// 	return res
+// }
+
+func calculateWeightedAverageCores(response *client.Response,
+	numberOfRows, numberOfMetrics, numberOfCores int) (map[string]float64, error) {
+	// initialize the metrics map with a constant size
+	metrics := make(map[string]float64, numberOfMetrics)
+	rows := response.Results[0].Series[0]
+	for i := 1; i < len(rows.Columns); i++ {
+		for j := 0; j < numberOfRows; j++ {
+			avg := 0.0
+			for k := 0; k < numberOfCores; k++ {
+				val, err := rows.Values[j*numberOfCores+k][i].(json.Number).Float64()
+				if err != nil {
+					klog.Infof("Error while calculating %v", rows.Columns[i])
+					return nil, err
+				}
+				//metrics[rows.Columns[i]] += val * float64(numberOfRows-j)
+				avg += val
+			}
+			metrics[rows.Columns[i]] += avg * float64(numberOfRows-j)
+		}
+		metrics[rows.Columns[i]] = metrics[rows.Columns[i]] / float64((numberOfRows * (numberOfRows + 1) / 2))
+		klog.Infof("%v : %v", rows.Columns[i], metrics[rows.Columns[i]])
+	}
+	// TODO better handling for the returning errors
+	return metrics, nil
+}
+
+// func connectToInfluxDB(cfg Config) (client.Client, error) {
+// 	c, err := client.NewHTTPClient(client.HTTPConfig{
+// 		Addr: "http://" + cfg.Server.Host + ":" + cfg.Server.Port + "",
+// 	})
+// 	if err != nil {
+// 		klog.Infof("Error while connecting to InfluxDB: %v ", err.Error())
+// 		return nil, err
+// 	}
+// 	klog.Infof("Connected Successfully to InfluxDB")
+// 	return c, nil
+
+// }
+
+// This function does the following:
+// 1. Queries the DB with the provided metrics and cores
+// 2. Calculates and returns the weighted average of each of those metrics
+func queryInfluxDbCores(metrics []string, uuid string, socket,
+	time int, cfg Config, c client.Client, cores []int) (map[string]float64, error) {
+
+	// calculate the number of rows needed
+	// i.e. 20sec / 0.5s interval => 40rows
+	numberOfRows := int(float32(time) / cfg.MonitoringSpecs.TimeInterval)
+	// EDIT
+	// This time we will fetch data for multiple cores
+	// so we will need more rows, proportional to the core number
+	numberOfRows *= len(cores)
+	// merge all the required columns
+	columns := strings.Join(metrics, ", ")
+
+	// build the cores part of the command
+	var coresPart strings.Builder
+	fmt.Fprintf(&coresPart, "core_id='%d'", cores[0])
+	for i := 1; i < len(cores); i++ {
+		fmt.Fprintf(&coresPart, " or core_id='%d'", cores[i])
+	}
+
+	// build the coommand
+	var command strings.Builder
+	fmt.Fprintf(&command, "SELECT %s from core_metrics where uuid = '%s' and socket_id='%d' and %s order by time desc limit %d", columns, uuid, socket, coresPart.String(), numberOfRows)
+	q := client.NewQuery(command.String(), cfg.Database.Name, "")
+	response, err := c.Query(q)
+	if err != nil {
+		klog.Infof("Error while executing the query: %v", err.Error())
+		return nil, err
+	}
+	// Calculate the average for the metrics provided
+	return calculateWeightedAverageCores(response, numberOfRows, len(metrics), len(cores))
+}
+
+func nodeSelectionScorer(nodeName string) (float64, error) {
+	//return (customRequestedScore(requested.MilliCPU, allocable.MilliCPU) +
+	//customRequestedScore(requested.Memory, allocable.Memory)) / 2
+
+	//read database information
+	var cfg Config
+	err := readFile(&cfg, "/etc/kubernetes/scheduler-monitoringDB.yaml")
+	if err != nil {
+		return 0, err
+	}
+
+	/*-------------------------------------
+	//TODO read also nodes to uuid mappings for EVOLVE
+	-------------------------------------*/
+
+	// InfluxDB
+	c, err := connectToInfluxDB(cfg)
+	if err != nil {
+		return 0, err
+	}
+	// close the connection in the end of execution
+	defer c.Close()
+
+	//Get the uuid of this node in order to query in the database
+	curr_uuid, ok := nodes[nodeName]
+	socket, _ := sockets[nodeName]
+	cores, _ := cores[nodeName]
+	if ok {
+
+		// Select Socket
+		results, err := queryInfluxDbCores([]string{"c6res"}, curr_uuid, socket, 20, cfg, c, cores)
+		if err != nil {
+			klog.Infof("Error in querying or calculating average: %v", err.Error())
+			return 0, nil
+		}
+
+		res := calculateScore(results, OneScorer)
+
+		// Select Node
+
+		klog.Infof("Node name %s, has score %v\n", nodeName, res)
+		return res, nil
+	} else {
+		klog.Infof("Error finding the uuid: %v", ok)
+		return 0, nil
+	}
+}

+ 2 - 0
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/priorities.go

@@ -53,4 +53,6 @@ const (
 	ResourceLimitsPriority = "ResourceLimitsPriority"
 	// Custom Resource Priority
 	CustomRequestedPriority = "CustomRequestedPriority"
+	// Node Selection Priority
+	NodeSelectionPriority = "NodeSelectionPriority"
 )

+ 1 - 0
kubernetes-v1.15.4/pkg/scheduler/algorithmprovider/defaults/defaults.go

@@ -116,6 +116,7 @@ func defaultPriorities() sets.String {
 		priorities.TaintTolerationPriority,
 		priorities.ImageLocalityPriority,
 		priorities.CustomRequestedPriority,
+		priorities.NodeSelectionPriority,
 	)
 }
 

+ 2 - 0
kubernetes-v1.15.4/pkg/scheduler/algorithmprovider/defaults/register_priorities.go

@@ -82,6 +82,8 @@ func init() {
 	// Prioritize nodes by custom function from custom metrics
 	factory.SocketRegisterPriorityFunction2(priorities.CustomRequestedPriority, priorities.CustomRequestedPriorityMap, nil, 1000000, true)
 
+	//
+	factory.RegisterPriorityFunction2(priorities.NodeSelectionPriority, priorities.NodeSelectionPriorityMap, nil, 1000000)
 	// Prioritizes nodes to help achieve balanced resource usage
 	factory.RegisterPriorityFunction2(priorities.BalancedResourceAllocation, priorities.BalancedResourceAllocationMap, nil, 1)
 

+ 10 - 2
kubernetes-v1.15.4/pkg/scheduler/core/generic_scheduler.go

@@ -279,12 +279,20 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
 		}
 	}
 
-	priorityList, err = PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, winningSocketNodes, g.extenders)
+	trace.Step("Selecting host")
+	nodePrioritizers := []priorities.PriorityConfig{
+		{
+			Name:   priorities.NodeSelectionPriority,
+			Map:    priorities.NodeSelectionPriorityMap,
+			Weight: 100,
+		},
+	}
+	priorityList, err = PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, nodePrioritizers, winningSocketNodes, g.extenders)
 
 	// -----------------------------------------------------
 	// ------------------END-CUSTOM-----------------------
 	// -----------------------------------------------------
-	trace.Step("Selecting host")
+	//trace.Step("Selecting host")
 
 	host, err := g.selectHost(priorityList)