Selaa lähdekoodia

two-stages-scheduling

Achilleas Tzenetopoulos 5 vuotta sitten
vanhempi
commit
0c4ab5bd81

+ 8 - 2
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/custom_resource_allocation.go

@@ -136,7 +136,7 @@ func calculateScore(results map[string]float64,
 }
 
 func calculateWeightedAverage(response *client.Response,
-	numberOfRows int, numberOfMetrics int) (map[string]float64, error) {
+	numberOfRows, numberOfMetrics int) (map[string]float64, error) {
 	// initialize the metrics map with a constant size
 	metrics := make(map[string]float64, numberOfMetrics)
 	rows := response.Results[0].Series[0]
@@ -169,7 +169,7 @@ func connectToInfluxDB(cfg Config) (client.Client, error) {
 
 }
 
-func queryInfluxDB(metrics []string, uuid string, socket int,
+func queryInfluxDB(metrics []string, uuid string, socket,
 	time int, cfg Config, c client.Client) (map[string]float64, error) {
 
 	// calculate the number of rows needed
@@ -199,6 +199,7 @@ func customResourceScorer(nodeName string) (float64, error) {
 	if err != nil {
 		return 0, err
 	}
+
 	/*-------------------------------------
 	//TODO read also nodes to uuid mappings for EVOLVE
 	-------------------------------------*/
@@ -216,12 +217,17 @@ func customResourceScorer(nodeName string) (float64, error) {
 	socket, _ := sockets[nodeName]
 
 	if ok {
+
+		// Select Socket
 		results, err := queryInfluxDB([]string{"ipc", "mem_read", "mem_write"}, curr_uuid, socket, 20, cfg, c)
 		if err != nil {
 			klog.Infof("Error in querying or calculating average: %v", err.Error())
 			return 0, nil
 		}
 		res := calculateScore(results, customScoreFn)
+
+		// Select Node
+
 		klog.Infof("Node name %s, has score %v\n", nodeName, res)
 		return res, nil
 	} else {

+ 18 - 1
kubernetes-v1.15.4/pkg/scheduler/algorithm/priorities/types.go

@@ -51,7 +51,10 @@ type PriorityFunction func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernode
 // PriorityConfig is a config used for a priority function.
 type PriorityConfig struct {
 	Name string
-	Map  PriorityMapFunction
+	// custom
+	isSocket bool
+	// custom
+	Map PriorityMapFunction
 	//CustomMap    CustomPriorityMapFunction
 	Reduce PriorityReduceFunction
 	//CustomReduce CustomPriorityReduceFunction
@@ -62,6 +65,20 @@ type PriorityConfig struct {
 	Weight int
 }
 
+// Custom PriorityConfig is a config used for a socket priority function
+// type SocketPriorityConfig {
+// 	Name string
+// 	Map  PriorityMapFunction
+// 	//CustomMap    CustomPriorityMapFunction
+// 	Reduce PriorityReduceFunction
+// 	//CustomReduce CustomPriorityReduceFunction
+// 	// TODO: Remove it after migrating all functions to
+// 	// Map-Reduce pattern.
+// 	Function PriorityFunction
+// 	//CustomFunction CustomPriorityFunction
+// 	Weight int
+// }
+
 // EmptyPriorityMetadataProducer returns a no-op PriorityMetadataProducer type.
 func EmptyPriorityMetadataProducer(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) interface{} {
 	return nil

+ 0 - 1
kubernetes-v1.15.4/pkg/scheduler/algorithm/scheduler_interface.go

@@ -40,7 +40,6 @@ type SchedulerExtender interface {
 	// are used to compute the weighted score for an extender. The weighted scores are added to
 	// the scores computed  by Kubernetes scheduler. The total scores are used to do the host selection.
 	Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error)
-	//CustomPrioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.CustomHostPriorityList, weight int, err error)
 	// Bind delegates the action of binding a pod to a node to the extender.
 	Bind(binding *v1.Binding) error
 

+ 1 - 1
kubernetes-v1.15.4/pkg/scheduler/algorithmprovider/defaults/register_priorities.go

@@ -80,7 +80,7 @@ func init() {
 	factory.RegisterPriorityFunction2(priorities.LeastRequestedPriority, priorities.LeastRequestedPriorityMap, nil, 1)
 
 	// Prioritize nodes by custom function from custom metrics
-	factory.RegisterPriorityFunction2(priorities.CustomRequestedPriority, priorities.CustomRequestedPriorityMap, nil, 1000000)
+	factory.SocketRegisterPriorityFunction2(priorities.CustomRequestedPriority, priorities.CustomRequestedPriorityMap, nil, 1000000, true)
 
 	// Prioritizes nodes to help achieve balanced resource usage
 	factory.RegisterPriorityFunction2(priorities.BalancedResourceAllocation, priorities.BalancedResourceAllocationMap, nil, 1)

+ 63 - 3
kubernetes-v1.15.4/pkg/scheduler/core/generic_scheduler.go

@@ -220,7 +220,9 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
 	metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
 	metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
 
-	trace.Step("Prioritizing")
+	//trace.Step("Prioritizing")
+	trace.Step("Prioritizing Sockets")
+
 	startPriorityEvalTime := time.Now()
 	// When only one node after predicate, just use it.
 	if len(filteredNodes) == 1 {
@@ -234,8 +236,22 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
 	}
 
 	metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
-	priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
-	//priorityList, err := CustomPrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
+
+	// default
+	//priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
+	// default
+
+	//start-custom
+	socketPrioritizers := []priorities.PriorityConfig{
+		{
+			Name:   priorities.CustomRequestedPriority,
+			Map:    priorities.CustomRequestedPriorityMap,
+			Weight: 100,
+		},
+	}
+
+	priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, socketPrioritizers, filteredNodes, g.extenders)
+	//end-custom
 	if err != nil {
 		return result, err
 	}
@@ -244,9 +260,34 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
 	metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
 	metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
 
+	// -----------------------------------------------------
+	// ------------------START-CUSTOM-----------------------
+	// -----------------------------------------------------
+	trace.Step("Selecting socket")
+	hosts, err := g.selectHostOnWinningSocket(priorityList)
+
+	//declare a subset of the snapshot of all available nodes
+	// create a new map, containing only the subset of the nodes
+	//var winningSocketNodes map[string]*schedulernodeinfo.NodeInfo
+	var winningSocketNodes []*v1.Node
+
+	for _, wn := range hosts {
+		for _, n := range filteredNodes {
+			if n.Name == wn {
+				winningSocketNodes = append(winningSocketNodes, n)
+			}
+		}
+	}
+
+	priorityList, err = PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, winningSocketNodes, g.extenders)
+
+	// -----------------------------------------------------
+	// ------------------END-CUSTOM-----------------------
+	// -----------------------------------------------------
 	trace.Step("Selecting host")
 
 	host, err := g.selectHost(priorityList)
+
 	return ScheduleResult{
 		SuggestedHost:  host,
 		EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
@@ -301,6 +342,24 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
 // ---------START OF CUSTOMIZATION----------------------------------------------------------------
 //------------------------------------------------------------------------------------------------
 //------------------------------------------------------------------------------------------------
+
+// 1st level of filtering
+// returns a list of nodes that belong to the winning socket
+func (g *genericScheduler) selectHostOnWinningSocket(priorityList schedulerapi.HostPriorityList) ([]string, error) {
+	var res []string
+	if len(priorityList) == 0 {
+		return res, fmt.Errorf("empty priorityList")
+	}
+
+	maxScores := findMaxScores(priorityList)
+	//ix := int(g.lastNodeIndex % uint64(len(maxScores)))
+	g.lastNodeIndex++
+	for _, idx := range maxScores {
+		res = append(res, priorityList[idx].Host)
+	}
+	return res, nil
+}
+
 // func (g *genericScheduler) customSelectHost(priorityList schedulerapi.CustomHostPriorityList) (string, error) {
 // 	if len(priorityList) == 0 {
 // 		return "", fmt.Errorf("empty priorityList")
@@ -744,6 +803,7 @@ func PrioritizeNodes(
 	workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
 		nodeInfo := nodeNameToInfo[nodes[index].Name]
 		for i := range priorityConfigs {
+			// The Function is nil if there is no Map-Reduce functionality provided
 			if priorityConfigs[i].Function != nil {
 				continue
 			}

+ 17 - 2
kubernetes-v1.15.4/pkg/scheduler/factory/plugins.go

@@ -78,9 +78,10 @@ type PriorityConfigFactory struct {
 	Function          PriorityFunctionFactory
 	MapReduceFunction PriorityFunctionFactory2
 	Weight            int
+	isSocket          bool
 }
 
-// type CustomPriorityConfigFactory struct {
+// type SocketPriorityConfigFactory struct {
 // 	Function          CustomPriorityFunctionFactory
 // 	MapReduceFunction CustomPriorityFunctionFactory2
 // 	Weight            int
@@ -94,7 +95,7 @@ var (
 	mandatoryFitPredicates = sets.NewString()
 	priorityFunctionMap    = make(map[string]PriorityConfigFactory)
 	//custom
-	//customPriorityFunctionMap = make(map[string]CustomPriorityConfigFactory)
+	//socketPriorityFunctionMap = make(map[string]SocketPriorityConfigFactory)
 	//custom
 	algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
 
@@ -307,6 +308,20 @@ func RegisterPriorityFunction2(
 	})
 }
 
+func SocketRegisterPriorityFunction2(
+	name string,
+	mapFunction priorities.PriorityMapFunction,
+	reduceFunction priorities.PriorityReduceFunction,
+	weight int, isSocket bool) string {
+	return RegisterPriorityConfigFactory(name, PriorityConfigFactory{
+		MapReduceFunction: func(PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {
+			return mapFunction, reduceFunction
+		},
+		Weight:   weight,
+		isSocket: isSocket,
+	})
+}
+
 //------------------------------------------------------------------------------------------------
 //------------------------------------------------------------------------------------------------
 // ---------START OF CUSTOMIZATION----------------------------------------------------------------