123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package core
- import (
- "context"
- "fmt"
- "math"
- "sort"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "k8s.io/klog"
- "github.com/iwita/kube-scheduler/customcache"
- v1 "k8s.io/api/core/v1"
- policy "k8s.io/api/policy/v1beta1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/util/errors"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/kubernetes/pkg/scheduler/algorithm"
- "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
- "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
- schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
- framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
- internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
- internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
- "k8s.io/kubernetes/pkg/scheduler/metrics"
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- "k8s.io/kubernetes/pkg/scheduler/util"
- "k8s.io/kubernetes/pkg/scheduler/volumebinder"
- utiltrace "k8s.io/utils/trace"
- )
- const (
- // minFeasibleNodesToFind is the minimum number of nodes that would be scored
- // in each scheduling cycle. This is a semi-arbitrary value to ensure that a
- // certain minimum of nodes are checked for feasibility. This in turn helps
- // ensure a minimum level of spreading.
- minFeasibleNodesToFind = 100
- // minFeasibleNodesPercentageToFind is the minimum percentage of nodes that
- // would be scored in each scheduling cycle. This is a semi-arbitrary value
- // to ensure that a certain minimum of nodes are checked for feasibility.
- // This in turn helps ensure a minimum level of spreading.
- minFeasibleNodesPercentageToFind = 5
- )
- var unresolvablePredicateFailureErrors = map[predicates.PredicateFailureReason]struct{}{
- predicates.ErrNodeSelectorNotMatch: {},
- predicates.ErrPodAffinityRulesNotMatch: {},
- predicates.ErrPodNotMatchHostName: {},
- predicates.ErrTaintsTolerationsNotMatch: {},
- predicates.ErrNodeLabelPresenceViolated: {},
- // Node conditions won't change when scheduler simulates removal of preemption victims.
- // So, it is pointless to try nodes that have not been able to host the pod due to node
- // conditions. These include ErrNodeNotReady, ErrNodeUnderPIDPressure, ErrNodeUnderMemoryPressure, ....
- predicates.ErrNodeNotReady: {},
- predicates.ErrNodeNetworkUnavailable: {},
- predicates.ErrNodeUnderDiskPressure: {},
- predicates.ErrNodeUnderPIDPressure: {},
- predicates.ErrNodeUnderMemoryPressure: {},
- predicates.ErrNodeUnschedulable: {},
- predicates.ErrNodeUnknownCondition: {},
- predicates.ErrVolumeZoneConflict: {},
- predicates.ErrVolumeNodeConflict: {},
- predicates.ErrVolumeBindConflict: {},
- }
- // FailedPredicateMap declares a map[string][]algorithm.PredicateFailureReason type.
- type FailedPredicateMap map[string][]predicates.PredicateFailureReason
- // FitError describes a fit error of a pod.
- type FitError struct {
- Pod *v1.Pod
- NumAllNodes int
- FailedPredicates FailedPredicateMap
- }
- // ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
- var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
- const (
- // NoNodeAvailableMsg is used to format message when no nodes available.
- NoNodeAvailableMsg = "0/%v nodes are available"
- )
- // Error returns detailed information of why the pod failed to fit on each node
- func (f *FitError) Error() string {
- reasons := make(map[string]int)
- for _, predicates := range f.FailedPredicates {
- for _, pred := range predicates {
- reasons[pred.GetReason()]++
- }
- }
- sortReasonsHistogram := func() []string {
- reasonStrings := []string{}
- for k, v := range reasons {
- reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
- }
- sort.Strings(reasonStrings)
- return reasonStrings
- }
- reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+": %v.", f.NumAllNodes, strings.Join(sortReasonsHistogram(), ", "))
- return reasonMsg
- }
- // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
- // onto machines.
- // TODO: Rename this type.
- type ScheduleAlgorithm interface {
- Schedule(*v1.Pod, algorithm.NodeLister) (scheduleResult ScheduleResult, err error)
- // Preempt receives scheduling errors for a pod and tries to create room for
- // the pod by preempting lower priority pods if possible.
- // It returns the node where preemption happened, a list of preempted pods, a
- // list of pods whose nominated node name should be removed, and error if any.
- Preempt(*v1.Pod, algorithm.NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
- // Predicates() returns a pointer to a map of predicate functions. This is
- // exposed for testing.
- Predicates() map[string]predicates.FitPredicate
- // Prioritizers returns a slice of priority config. This is exposed for
- // testing.
- Prioritizers() []priorities.PriorityConfig
- }
- // ScheduleResult represents the result of one pod scheduled. It will contain
- // the final selected Node, along with the selected intermediate information.
- type ScheduleResult struct {
- // Name of the scheduler suggest host
- SuggestedHost string
- // Number of nodes scheduler evaluated on one pod scheduled
- EvaluatedNodes int
- // Number of feasible nodes on one pod scheduled
- FeasibleNodes int
- }
- type genericScheduler struct {
- cache internalcache.Cache
- schedulingQueue internalqueue.SchedulingQueue
- predicates map[string]predicates.FitPredicate
- priorityMetaProducer priorities.PriorityMetadataProducer
- predicateMetaProducer predicates.PredicateMetadataProducer
- prioritizers []priorities.PriorityConfig
- framework framework.Framework
- extenders []algorithm.SchedulerExtender
- lastNodeIndex uint64
- alwaysCheckAllPredicates bool
- nodeInfoSnapshot *internalcache.NodeInfoSnapshot
- volumeBinder *volumebinder.VolumeBinder
- pvcLister corelisters.PersistentVolumeClaimLister
- pdbLister algorithm.PDBLister
- disablePreemption bool
- percentageOfNodesToScore int32
- enableNonPreempting bool
- }
- // snapshot snapshots scheduler cache and node infos for all fit and priority
- // functions.
- func (g *genericScheduler) snapshot() error {
- // Used for all fit and priority funcs.
- return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot)
- }
- // Schedule tries to schedule the given pod to one of the nodes in the node list.
- // If it succeeds, it will return the name of the node.
- // If it fails, it will return a FitError error with reasons.
- func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
- trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
- defer trace.LogIfLong(100 * time.Millisecond)
- if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
- return result, err
- }
- nodes, err := nodeLister.List()
- if err != nil {
- return result, err
- }
- if len(nodes) == 0 {
- return result, ErrNoNodesAvailable
- }
- if err := g.snapshot(); err != nil {
- return result, err
- }
- trace.Step("Computing predicates")
- startPredicateEvalTime := time.Now()
- filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
- if err != nil {
- return result, err
- }
- if len(filteredNodes) == 0 {
- return result, &FitError{
- Pod: pod,
- NumAllNodes: len(nodes),
- FailedPredicates: failedPredicateMap,
- }
- }
- metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime))
- metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
- metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
- metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
- //trace.Step("Prioritizing")
- trace.Step("Prioritizing Sockets")
- startPriorityEvalTime := time.Now()
- // When only one node after predicate, just use it.
- if len(filteredNodes) == 1 {
- metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
- metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
- return ScheduleResult{
- SuggestedHost: filteredNodes[0].Name,
- EvaluatedNodes: 1 + len(failedPredicateMap),
- FeasibleNodes: 1,
- }, nil
- }
- metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
- // default
- //priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
- // default
- //start-custom
- socketPrioritizers := []priorities.PriorityConfig{
- {
- Name: priorities.CustomRequestedPriority,
- Map: priorities.CustomRequestedPriorityMap,
- Weight: 100,
- },
- }
- priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, socketPrioritizers, filteredNodes, g.extenders)
- //end-custom
- if err != nil {
- return result, err
- }
- metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
- metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
- metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
- metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
- // -----------------------------------------------------
- // ------------------START-CUSTOM-----------------------
- // -----------------------------------------------------
- trace.Step("Selecting socket")
- hosts, err := g.selectHostOnWinningSocket(priorityList)
- //declare a subset of the snapshot of all available nodes
- // create a new map, containing only the subset of the nodes
- //var winningSocketNodes map[string]*schedulernodeinfo.NodeInfo
- var winningSocketNodes []*v1.Node
- for _, wn := range hosts {
- for _, n := range filteredNodes {
- if n.Name == wn {
- winningSocketNodes = append(winningSocketNodes, n)
- }
- }
- }
- trace.Step("Selecting host")
- nodePrioritizers := []priorities.PriorityConfig{
- {
- Name: priorities.NodeSelectionPriority,
- Map: priorities.NodeSelectionPriorityMap,
- Weight: 100,
- },
- }
- priorityList, err = PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, nodePrioritizers, winningSocketNodes, g.extenders)
- // The winner host
- host, err := g.selectHost(priorityList)
- winningSocket := priorities.Sockets[host]
- winningUuid := priorities.Nodes[host]
- klog.Infof("Winning node: %v, Socket %v, UUID: %v", host, winningSocket, winningUuid)
- var tmp []string
- var socketNodes []string
- for key, val := range priorities.Nodes {
- if val == winningUuid {
- tmp = append(tmp, key)
- }
- }
- for _, n := range tmp {
- if priorities.Sockets[n] == winningSocket {
- socketNodes = append(socketNodes, n)
- }
- }
- // Add pod's information (average metrics to the winning nodes metrics) and cache them
- podName := pod.ObjectMeta.Name
- for _, n := range socketNodes {
- klog.Infof("Update Score for Node %v, using App: %v", n, podName)
- klog.Infof("App metrics: %v", priorities.Applications[podName].Metrics)
- numCores := len(priorities.Cores[n])
- customcache.LabCache.AddAppMetrics(priorities.Applications[podName].Metrics, n, numCores)
- }
- // -----------------------------------------------------
- // ------------------END-CUSTOM-----------------------
- // -----------------------------------------------------
- //trace.Step("Selecting host")
- return ScheduleResult{
- SuggestedHost: host,
- EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
- FeasibleNodes: len(filteredNodes),
- }, err
- }
- // Prioritizers returns a slice containing all the scheduler's priority
- // functions and their config. It is exposed for testing only.
- func (g *genericScheduler) Prioritizers() []priorities.PriorityConfig {
- return g.prioritizers
- }
- // Predicates returns a map containing all the scheduler's predicate
- // functions. It is exposed for testing only.
- func (g *genericScheduler) Predicates() map[string]predicates.FitPredicate {
- return g.predicates
- }
- // findMaxScores returns the indexes of nodes in the "priorityList" that has the highest "Score".
- func findMaxScores(priorityList schedulerapi.HostPriorityList) []int {
- maxScoreIndexes := make([]int, 0, len(priorityList)/2)
- maxScore := priorityList[0].Score
- for i, hp := range priorityList {
- if hp.Score > maxScore {
- maxScore = hp.Score
- maxScoreIndexes = maxScoreIndexes[:0]
- maxScoreIndexes = append(maxScoreIndexes, i)
- } else if hp.Score == maxScore {
- maxScoreIndexes = append(maxScoreIndexes, i)
- }
- }
- return maxScoreIndexes
- }
- // selectHost takes a prioritized list of nodes and then picks one
- // in a round-robin manner from the nodes that had the highest score.
- func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
- if len(priorityList) == 0 {
- return "", fmt.Errorf("empty priorityList")
- }
- maxScores := findMaxScores(priorityList)
- ix := int(g.lastNodeIndex % uint64(len(maxScores)))
- g.lastNodeIndex++
- return priorityList[maxScores[ix]].Host, nil
- }
- //------------------------------------------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- // ---------START OF CUSTOMIZATION----------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- // 1st level of filtering
- // returns a list of nodes that belong to the winning socket
- func (g *genericScheduler) selectHostOnWinningSocket(priorityList schedulerapi.HostPriorityList) ([]string, error) {
- var res []string
- if len(priorityList) == 0 {
- return res, fmt.Errorf("empty priorityList")
- }
- maxScores := findMaxScores(priorityList)
- //ix := int(g.lastNodeIndex % uint64(len(maxScores)))
- g.lastNodeIndex++
- for _, idx := range maxScores {
- res = append(res, priorityList[idx].Host)
- }
- return res, nil
- }
- // func (g *genericScheduler) customSelectHost(priorityList schedulerapi.CustomHostPriorityList) (string, error) {
- // if len(priorityList) == 0 {
- // return "", fmt.Errorf("empty priorityList")
- // }
- // maxScores := findMaxScores(priorityList)
- // ix := int(g.lastNodeIndex % uint64(len(maxScores)))
- // g.lastNodeIndex++
- // return priorityList[maxScores[ix]].Host, nil
- // }
- //------------------------------------------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- // ---------END OF CUSTOMIZATION----------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- //------------------------------------------------------------------------------------------------
- // preempt finds nodes with pods that can be preempted to make room for "pod" to
- // schedule. It chooses one of the nodes and preempts the pods on the node and
- // returns 1) the node, 2) the list of preempted pods if such a node is found,
- // 3) A list of pods whose nominated node name should be cleared, and 4) any
- // possible error.
- // Preempt does not update its snapshot. It uses the same snapshot used in the
- // scheduling cycle. This is to avoid a scenario where preempt finds feasible
- // nodes without preempting any pod. When there are many pending pods in the
- // scheduling queue a nominated pod will go back to the queue and behind
- // other pods with the same priority. The nominated pod prevents other pods from
- // using the nominated resources and the nominated pod could take a long time
- // before it is retried after many other pending pods.
- func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
- // Scheduler may return various types of errors. Consider preemption only if
- // the error is of type FitError.
- fitError, ok := scheduleErr.(*FitError)
- if !ok || fitError == nil {
- return nil, nil, nil, nil
- }
- if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) {
- klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
- return nil, nil, nil, nil
- }
- allNodes, err := nodeLister.List()
- if err != nil {
- return nil, nil, nil, err
- }
- if len(allNodes) == 0 {
- return nil, nil, nil, ErrNoNodesAvailable
- }
- potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
- if len(potentialNodes) == 0 {
- klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
- // In this case, we should clean-up any existing nominated node name of the pod.
- return nil, nil, []*v1.Pod{pod}, nil
- }
- pdbs, err := g.pdbLister.List(labels.Everything())
- if err != nil {
- return nil, nil, nil, err
- }
- nodeToVictims, err := selectNodesForPreemption(pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
- g.predicateMetaProducer, g.schedulingQueue, pdbs)
- if err != nil {
- return nil, nil, nil, err
- }
- // We will only check nodeToVictims with extenders that support preemption.
- // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
- // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
- nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
- if err != nil {
- return nil, nil, nil, err
- }
- candidateNode := pickOneNodeForPreemption(nodeToVictims)
- if candidateNode == nil {
- return nil, nil, nil, nil
- }
- // Lower priority pods nominated to run on this node, may no longer fit on
- // this node. So, we should remove their nomination. Removing their
- // nomination updates these pods and moves them to the active queue. It
- // lets scheduler find another place for them.
- nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
- if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {
- return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil
- }
- return nil, nil, nil, fmt.Errorf(
- "preemption failed: the target node %s has been deleted from scheduler cache",
- candidateNode.Name)
- }
- // processPreemptionWithExtenders processes preemption with extenders
- func (g *genericScheduler) processPreemptionWithExtenders(
- pod *v1.Pod,
- nodeToVictims map[*v1.Node]*schedulerapi.Victims,
- ) (map[*v1.Node]*schedulerapi.Victims, error) {
- if len(nodeToVictims) > 0 {
- for _, extender := range g.extenders {
- if extender.SupportsPreemption() && extender.IsInterested(pod) {
- newNodeToVictims, err := extender.ProcessPreemption(
- pod,
- nodeToVictims,
- g.nodeInfoSnapshot.NodeInfoMap,
- )
- if err != nil {
- if extender.IsIgnorable() {
- klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
- extender, err)
- continue
- }
- return nil, err
- }
- // Replace nodeToVictims with new result after preemption. So the
- // rest of extenders can continue use it as parameter.
- nodeToVictims = newNodeToVictims
- // If node list becomes empty, no preemption can happen regardless of other extenders.
- if len(nodeToVictims) == 0 {
- break
- }
- }
- }
- }
- return nodeToVictims, nil
- }
- // getLowerPriorityNominatedPods returns pods whose priority is smaller than the
- // priority of the given "pod" and are nominated to run on the given node.
- // Note: We could possibly check if the nominated lower priority pods still fit
- // and return those that no longer fit, but that would require lots of
- // manipulation of NodeInfo and PredicateMeta per nominated pod. It may not be
- // worth the complexity, especially because we generally expect to have a very
- // small number of nominated pods per node.
- func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
- pods := g.schedulingQueue.NominatedPodsForNode(nodeName)
- if len(pods) == 0 {
- return nil
- }
- var lowerPriorityPods []*v1.Pod
- podPriority := util.GetPodPriority(pod)
- for _, p := range pods {
- if util.GetPodPriority(p) < podPriority {
- lowerPriorityPods = append(lowerPriorityPods, p)
- }
- }
- return lowerPriorityPods
- }
- // numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
- // its search for more feasible nodes.
- func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
- if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
- return numAllNodes
- }
- adaptivePercentage := g.percentageOfNodesToScore
- if adaptivePercentage <= 0 {
- adaptivePercentage = schedulerapi.DefaultPercentageOfNodesToScore - numAllNodes/125
- if adaptivePercentage < minFeasibleNodesPercentageToFind {
- adaptivePercentage = minFeasibleNodesPercentageToFind
- }
- }
- numNodes = numAllNodes * adaptivePercentage / 100
- if numNodes < minFeasibleNodesToFind {
- return minFeasibleNodesToFind
- }
- return numNodes
- }
- // Filters the nodes to find the ones that fit based on the given predicate functions
- // Each node is passed through the predicate functions to determine if it is a fit
- func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
- var filtered []*v1.Node
- failedPredicateMap := FailedPredicateMap{}
- if len(g.predicates) == 0 {
- filtered = nodes
- } else {
- allNodes := int32(g.cache.NodeTree().NumNodes())
- numNodesToFind := g.numFeasibleNodesToFind(allNodes)
- // Create filtered list with enough space to avoid growing it
- // and allow assigning.
- filtered = make([]*v1.Node, numNodesToFind)
- errs := errors.MessageCountMap{}
- var (
- predicateResultLock sync.Mutex
- filteredLen int32
- )
- ctx, cancel := context.WithCancel(context.Background())
- // We can use the same metadata producer for all nodes.
- meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
- checkNode := func(i int) {
- nodeName := g.cache.NodeTree().Next()
- fits, failedPredicates, err := podFitsOnNode(
- pod,
- meta,
- g.nodeInfoSnapshot.NodeInfoMap[nodeName],
- g.predicates,
- g.schedulingQueue,
- g.alwaysCheckAllPredicates,
- )
- if err != nil {
- predicateResultLock.Lock()
- errs[err.Error()]++
- predicateResultLock.Unlock()
- return
- }
- if fits {
- length := atomic.AddInt32(&filteredLen, 1)
- if length > numNodesToFind {
- cancel()
- atomic.AddInt32(&filteredLen, -1)
- } else {
- filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
- }
- } else {
- predicateResultLock.Lock()
- failedPredicateMap[nodeName] = failedPredicates
- predicateResultLock.Unlock()
- }
- }
- // Stops searching for more nodes once the configured number of feasible nodes
- // are found.
- workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
- filtered = filtered[:filteredLen]
- if len(errs) > 0 {
- return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
- }
- }
- if len(filtered) > 0 && len(g.extenders) != 0 {
- for _, extender := range g.extenders {
- if !extender.IsInterested(pod) {
- continue
- }
- filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)
- if err != nil {
- if extender.IsIgnorable() {
- klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
- extender, err)
- continue
- } else {
- return []*v1.Node{}, FailedPredicateMap{}, err
- }
- }
- for failedNodeName, failedMsg := range failedMap {
- if _, found := failedPredicateMap[failedNodeName]; !found {
- failedPredicateMap[failedNodeName] = []predicates.PredicateFailureReason{}
- }
- failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
- }
- filtered = filteredList
- if len(filtered) == 0 {
- break
- }
- }
- }
- return filtered, failedPredicateMap, nil
- }
- // addNominatedPods adds pods with equal or greater priority which are nominated
- // to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
- // any pod was found, 2) augmented meta data, 3) augmented nodeInfo.
- func addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata,
- nodeInfo *schedulernodeinfo.NodeInfo, queue internalqueue.SchedulingQueue) (bool, predicates.PredicateMetadata,
- *schedulernodeinfo.NodeInfo) {
- if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
- // This may happen only in tests.
- return false, meta, nodeInfo
- }
- nominatedPods := queue.NominatedPodsForNode(nodeInfo.Node().Name)
- if nominatedPods == nil || len(nominatedPods) == 0 {
- return false, meta, nodeInfo
- }
- var metaOut predicates.PredicateMetadata
- if meta != nil {
- metaOut = meta.ShallowCopy()
- }
- nodeInfoOut := nodeInfo.Clone()
- for _, p := range nominatedPods {
- if util.GetPodPriority(p) >= util.GetPodPriority(pod) && p.UID != pod.UID {
- nodeInfoOut.AddPod(p)
- if metaOut != nil {
- metaOut.AddPod(p, nodeInfoOut)
- }
- }
- }
- return true, metaOut, nodeInfoOut
- }
- // podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
- // For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
- // predicate results as possible.
- // This function is called from two different places: Schedule and Preempt.
- // When it is called from Schedule, we want to test whether the pod is schedulable
- // on the node with all the existing pods on the node plus higher and equal priority
- // pods nominated to run on the node.
- // When it is called from Preempt, we should remove the victims of preemption and
- // add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
- // It removes victims from meta and NodeInfo before calling this function.
- func podFitsOnNode(
- pod *v1.Pod,
- meta predicates.PredicateMetadata,
- info *schedulernodeinfo.NodeInfo,
- predicateFuncs map[string]predicates.FitPredicate,
- queue internalqueue.SchedulingQueue,
- alwaysCheckAllPredicates bool,
- ) (bool, []predicates.PredicateFailureReason, error) {
- var failedPredicates []predicates.PredicateFailureReason
- podsAdded := false
- // We run predicates twice in some cases. If the node has greater or equal priority
- // nominated pods, we run them when those pods are added to meta and nodeInfo.
- // If all predicates succeed in this pass, we run them again when these
- // nominated pods are not added. This second pass is necessary because some
- // predicates such as inter-pod affinity may not pass without the nominated pods.
- // If there are no nominated pods for the node or if the first run of the
- // predicates fail, we don't run the second pass.
- // We consider only equal or higher priority pods in the first pass, because
- // those are the current "pod" must yield to them and not take a space opened
- // for running them. It is ok if the current "pod" take resources freed for
- // lower priority pods.
- // Requiring that the new pod is schedulable in both circumstances ensures that
- // we are making a conservative decision: predicates like resources and inter-pod
- // anti-affinity are more likely to fail when the nominated pods are treated
- // as running, while predicates like pod affinity are more likely to fail when
- // the nominated pods are treated as not running. We can't just assume the
- // nominated pods are running because they are not running right now and in fact,
- // they may end up getting scheduled to a different node.
- for i := 0; i < 2; i++ {
- metaToUse := meta
- nodeInfoToUse := info
- if i == 0 {
- podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
- } else if !podsAdded || len(failedPredicates) != 0 {
- break
- }
- for _, predicateKey := range predicates.Ordering() {
- var (
- fit bool
- reasons []predicates.PredicateFailureReason
- err error
- )
- //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
- if predicate, exist := predicateFuncs[predicateKey]; exist {
- fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
- if err != nil {
- return false, []predicates.PredicateFailureReason{}, err
- }
- if !fit {
- // eCache is available and valid, and predicates result is unfit, record the fail reasons
- failedPredicates = append(failedPredicates, reasons...)
- // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
- if !alwaysCheckAllPredicates {
- klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
- "evaluation is short circuited and there are chances " +
- "of other predicates failing as well.")
- break
- }
- }
- }
- }
- }
- return len(failedPredicates) == 0, failedPredicates, nil
- }
- // PrioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
- // Each priority function is expected to set a score of 0-10
- // 0 is the lowest priority score (least preferred node) and 10 is the highest
- // Each priority function can also have its own weight
- // The node scores returned by the priority function are multiplied by the weights to get weighted scores
- // All scores are finally combined (added) to get the total weighted scores of all nodes
- func PrioritizeNodes(
- pod *v1.Pod,
- nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
- meta interface{},
- priorityConfigs []priorities.PriorityConfig,
- nodes []*v1.Node,
- extenders []algorithm.SchedulerExtender,
- ) (schedulerapi.HostPriorityList, error) {
- // If no priority configs are provided, then the EqualPriority function is applied
- // This is required to generate the priority list in the required format
- if len(priorityConfigs) == 0 && len(extenders) == 0 {
- result := make(schedulerapi.HostPriorityList, 0, len(nodes))
- for i := range nodes {
- hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
- if err != nil {
- return nil, err
- }
- result = append(result, hostPriority)
- }
- return result, nil
- }
- var (
- mu = sync.Mutex{}
- wg = sync.WaitGroup{}
- errs []error
- )
- appendError := func(err error) {
- mu.Lock()
- defer mu.Unlock()
- errs = append(errs, err)
- }
- results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
- // DEPRECATED: we can remove this when all priorityConfigs implement the
- // Map-Reduce pattern.
- for i := range priorityConfigs {
- if priorityConfigs[i].Function != nil {
- wg.Add(1)
- go func(index int) {
- defer wg.Done()
- var err error
- results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
- if err != nil {
- appendError(err)
- }
- }(i)
- } else {
- results[i] = make(schedulerapi.HostPriorityList, len(nodes))
- }
- }
- workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
- nodeInfo := nodeNameToInfo[nodes[index].Name]
- for i := range priorityConfigs {
- // The Function is nil if there is no Map-Reduce functionality provided
- if priorityConfigs[i].Function != nil {
- continue
- }
- var err error
- results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
- if err != nil {
- appendError(err)
- results[i][index].Host = nodes[index].Name
- }
- }
- })
- for i := range priorityConfigs {
- if priorityConfigs[i].Reduce == nil {
- continue
- }
- wg.Add(1)
- go func(index int) {
- defer wg.Done()
- if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
- appendError(err)
- }
- if klog.V(10) {
- for _, hostPriority := range results[index] {
- klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
- }
- }
- }(i)
- }
- // Wait for all computations to be finished.
- wg.Wait()
- if len(errs) != 0 {
- return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
- }
- // Summarize all scores.
- result := make(schedulerapi.HostPriorityList, 0, len(nodes))
- for i := range nodes {
- result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
- for j := range priorityConfigs {
- result[i].Score += results[j][i].Score * float64(priorityConfigs[j].Weight)
- }
- }
- if len(extenders) != 0 && nodes != nil {
- combinedScores := make(map[string]float64, len(nodeNameToInfo))
- for i := range extenders {
- if !extenders[i].IsInterested(pod) {
- continue
- }
- wg.Add(1)
- go func(extIndex int) {
- defer wg.Done()
- prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
- if err != nil {
- // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
- return
- }
- mu.Lock()
- for i := range *prioritizedList {
- host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
- if klog.V(10) {
- klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
- }
- combinedScores[host] += score * float64(weight)
- }
- mu.Unlock()
- }(i)
- }
- // wait for all go routines to finish
- wg.Wait()
- for i := range result {
- result[i].Score += combinedScores[result[i].Host]
- }
- }
- if klog.V(10) {
- for i := range result {
- klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
- }
- }
- return result, nil
- }
- //------------------------------------------------------------------
- //-------------------START-CUSTOM-BY-IWITA---------------------------------------------------------
- //------------------------------------------------------------------
- // func CustomPrioritizeNodes(
- // pod *v1.Pod,
- // nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
- // meta interface{},
- // priorityConfigs []priorities.PriorityConfig,
- // nodes []*v1.Node,
- // extenders []algorithm.SchedulerExtender,
- // ) (schedulerapi.CustomHostPriorityList, error) {
- // // If no priority configs are provided, then the EqualPriority function is applied
- // // This is required to generate the priority list in the required format
- // // if len(priorityConfigs) == 0 && len(extenders) == 0 {
- // // result := make(schedulerapi.CustomHostPriorityList, 0, len(nodes))
- // // for i := range nodes {
- // // // initializes nodes with Score = 1
- // // hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
- // // if err != nil {
- // // return nil, err
- // // }
- // // result = append(result, hostPriority)
- // // }
- // // return result, nil
- // // }
- // var (
- // mu = sync.Mutex{}
- // wg = sync.WaitGroup{}
- // errs []error
- // )
- // appendError := func(err error) {
- // mu.Lock()
- // defer mu.Unlock()
- // errs = append(errs, err)
- // }
- // results := make([]schedulerapi.CustomHostPriorityList, len(priorityConfigs), len(priorityConfigs))
- // // DEPRECATED: we can remove this when all priorityConfigs implement the
- // // Map-Reduce pattern.
- // for i := range priorityConfigs {
- // if priorityConfigs[i].CustomFunction != nil {
- // wg.Add(1)
- // go func(index int) {
- // defer wg.Done()
- // var err error
- // results[index], err = priorityConfigs[index].CustomFunction(pod, nodeNameToInfo, nodes)
- // if err != nil {
- // appendError(err)
- // }
- // }(i)
- // } else {
- // results[i] = make(schedulerapi.CustomHostPriorityList, len(nodes))
- // }
- // }
- // workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
- // nodeInfo := nodeNameToInfo[nodes[index].Name]
- // for i := range priorityConfigs {
- // if priorityConfigs[i].Function != nil {
- // continue
- // }
- // var err error
- // results[i][index], err = priorityConfigs[i].CustomMap(pod, meta, nodeInfo)
- // if err != nil {
- // appendError(err)
- // results[i][index].Host = nodes[index].Name
- // }
- // }
- // })
- // for i := range priorityConfigs {
- // if priorityConfigs[i].Reduce == nil {
- // continue
- // }
- // wg.Add(1)
- // go func(index int) {
- // defer wg.Done()
- // if err := priorityConfigs[index].CustomReduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
- // appendError(err)
- // }
- // if klog.V(10) {
- // for _, hostPriority := range results[index] {
- // klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
- // }
- // }
- // }(i)
- // }
- // // Wait for all computations to be finished.
- // wg.Wait()
- // if len(errs) != 0 {
- // return schedulerapi.CustomHostPriorityList{}, errors.NewAggregate(errs)
- // }
- // // Summarize all scores.
- // result := make(schedulerapi.CustomHostPriorityList, 0, len(nodes))
- // for i := range nodes {
- // result = append(result, schedulerapi.CustomHostPriority{Host: nodes[i].Name, Score: 0})
- // for j := range priorityConfigs {
- // result[i].Score += results[j][i].Score * float64(priorityConfigs[j].Weight)
- // }
- // }
- // if len(extenders) != 0 && nodes != nil {
- // combinedScores := make(map[string]float64, len(nodeNameToInfo))
- // for i := range extenders {
- // if !extenders[i].IsInterested(pod) {
- // continue
- // }
- // wg.Add(1)
- // go func(extIndex int) {
- // defer wg.Done()
- // prioritizedList, weight, err := extenders[extIndex].CustomPrioritize(pod, nodes)
- // if err != nil {
- // // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
- // return
- // }
- // mu.Lock()
- // for i := range *prioritizedList {
- // host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
- // if klog.V(10) {
- // klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
- // }
- // combinedScores[host] += score * float64(weight)
- // }
- // mu.Unlock()
- // }(i)
- // }
- // // wait for all go routines to finish
- // wg.Wait()
- // for i := range result {
- // result[i].Score += combinedScores[result[i].Host]
- // }
- // }
- // if klog.V(10) {
- // for i := range result {
- // klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
- // }
- // }
- // return result, nil
- // }
- //------------------------------------------------------------------
- // --------------END-CUSTOM-BY-IWITA--------------------------------
- //------------------------------------------------------------------
- // EqualPriorityMap is a prioritizer function that gives an equal weight of one to all nodes
- func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error) {
- node := nodeInfo.Node()
- if node == nil {
- return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
- }
- return schedulerapi.HostPriority{
- Host: node.Name,
- Score: 1,
- }, nil
- }
- // pickOneNodeForPreemption chooses one node among the given nodes. It assumes
- // pods in each map entry are ordered by decreasing priority.
- // It picks a node based on the following criteria:
- // 1. A node with minimum number of PDB violations.
- // 2. A node with minimum highest priority victim is picked.
- // 3. Ties are broken by sum of priorities of all victims.
- // 4. If there are still ties, node with the minimum number of victims is picked.
- // 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
- // 6. If there are still ties, the first such node is picked (sort of randomly).
- // The 'minNodes1' and 'minNodes2' are being reused here to save the memory
- // allocation and garbage collection time.
- func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
- if len(nodesToVictims) == 0 {
- return nil
- }
- minNumPDBViolatingPods := math.MaxInt32
- var minNodes1 []*v1.Node
- lenNodes1 := 0
- for node, victims := range nodesToVictims {
- if len(victims.Pods) == 0 {
- // We found a node that doesn't need any preemption. Return it!
- // This should happen rarely when one or more pods are terminated between
- // the time that scheduler tries to schedule the pod and the time that
- // preemption logic tries to find nodes for preemption.
- return node
- }
- numPDBViolatingPods := victims.NumPDBViolations
- if numPDBViolatingPods < minNumPDBViolatingPods {
- minNumPDBViolatingPods = numPDBViolatingPods
- minNodes1 = nil
- lenNodes1 = 0
- }
- if numPDBViolatingPods == minNumPDBViolatingPods {
- minNodes1 = append(minNodes1, node)
- lenNodes1++
- }
- }
- if lenNodes1 == 1 {
- return minNodes1[0]
- }
- // There are more than one node with minimum number PDB violating pods. Find
- // the one with minimum highest priority victim.
- minHighestPriority := int32(math.MaxInt32)
- var minNodes2 = make([]*v1.Node, lenNodes1)
- lenNodes2 := 0
- for i := 0; i < lenNodes1; i++ {
- node := minNodes1[i]
- victims := nodesToVictims[node]
- // highestPodPriority is the highest priority among the victims on this node.
- highestPodPriority := util.GetPodPriority(victims.Pods[0])
- if highestPodPriority < minHighestPriority {
- minHighestPriority = highestPodPriority
- lenNodes2 = 0
- }
- if highestPodPriority == minHighestPriority {
- minNodes2[lenNodes2] = node
- lenNodes2++
- }
- }
- if lenNodes2 == 1 {
- return minNodes2[0]
- }
- // There are a few nodes with minimum highest priority victim. Find the
- // smallest sum of priorities.
- minSumPriorities := int64(math.MaxInt64)
- lenNodes1 = 0
- for i := 0; i < lenNodes2; i++ {
- var sumPriorities int64
- node := minNodes2[i]
- for _, pod := range nodesToVictims[node].Pods {
- // We add MaxInt32+1 to all priorities to make all of them >= 0. This is
- // needed so that a node with a few pods with negative priority is not
- // picked over a node with a smaller number of pods with the same negative
- // priority (and similar scenarios).
- sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
- }
- if sumPriorities < minSumPriorities {
- minSumPriorities = sumPriorities
- lenNodes1 = 0
- }
- if sumPriorities == minSumPriorities {
- minNodes1[lenNodes1] = node
- lenNodes1++
- }
- }
- if lenNodes1 == 1 {
- return minNodes1[0]
- }
- // There are a few nodes with minimum highest priority victim and sum of priorities.
- // Find one with the minimum number of pods.
- minNumPods := math.MaxInt32
- lenNodes2 = 0
- for i := 0; i < lenNodes1; i++ {
- node := minNodes1[i]
- numPods := len(nodesToVictims[node].Pods)
- if numPods < minNumPods {
- minNumPods = numPods
- lenNodes2 = 0
- }
- if numPods == minNumPods {
- minNodes2[lenNodes2] = node
- lenNodes2++
- }
- }
- if lenNodes2 == 1 {
- return minNodes2[0]
- }
- // There are a few nodes with same number of pods.
- // Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))
- latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
- if latestStartTime == nil {
- // If the earliest start time of all pods on the 1st node is nil, just return it,
- // which is not expected to happen.
- klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
- return minNodes2[0]
- }
- nodeToReturn := minNodes2[0]
- for i := 1; i < lenNodes2; i++ {
- node := minNodes2[i]
- // Get earliest start time of all pods on the current node.
- earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
- if earliestStartTimeOnNode == nil {
- klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
- continue
- }
- if earliestStartTimeOnNode.After(latestStartTime.Time) {
- latestStartTime = earliestStartTimeOnNode
- nodeToReturn = node
- }
- }
- return nodeToReturn
- }
- // selectNodesForPreemption finds all the nodes with possible victims for
- // preemption in parallel.
- func selectNodesForPreemption(pod *v1.Pod,
- nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
- potentialNodes []*v1.Node,
- fitPredicates map[string]predicates.FitPredicate,
- metadataProducer predicates.PredicateMetadataProducer,
- queue internalqueue.SchedulingQueue,
- pdbs []*policy.PodDisruptionBudget,
- ) (map[*v1.Node]*schedulerapi.Victims, error) {
- nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
- var resultLock sync.Mutex
- // We can use the same metadata producer for all nodes.
- meta := metadataProducer(pod, nodeNameToInfo)
- checkNode := func(i int) {
- nodeName := potentialNodes[i].Name
- var metaCopy predicates.PredicateMetadata
- if meta != nil {
- metaCopy = meta.ShallowCopy()
- }
- pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
- if fits {
- resultLock.Lock()
- victims := schedulerapi.Victims{
- Pods: pods,
- NumPDBViolations: numPDBViolations,
- }
- nodeToVictims[potentialNodes[i]] = &victims
- resultLock.Unlock()
- }
- }
- workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
- return nodeToVictims, nil
- }
- // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
- // and "nonViolatingPods" based on whether their PDBs will be violated if they are
- // preempted.
- // This function is stable and does not change the order of received pods. So, if it
- // receives a sorted list, grouping will preserve the order of the input list.
- func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
- for _, obj := range pods {
- pod := obj.(*v1.Pod)
- pdbForPodIsViolated := false
- // A pod with no labels will not match any PDB. So, no need to check.
- if len(pod.Labels) != 0 {
- for _, pdb := range pdbs {
- if pdb.Namespace != pod.Namespace {
- continue
- }
- selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
- if err != nil {
- continue
- }
- // A PDB with a nil or empty selector matches nothing.
- if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
- continue
- }
- // We have found a matching PDB.
- if pdb.Status.PodDisruptionsAllowed <= 0 {
- pdbForPodIsViolated = true
- break
- }
- }
- }
- if pdbForPodIsViolated {
- violatingPods = append(violatingPods, pod)
- } else {
- nonViolatingPods = append(nonViolatingPods, pod)
- }
- }
- return violatingPods, nonViolatingPods
- }
- // selectVictimsOnNode finds minimum set of pods on the given node that should
- // be preempted in order to make enough room for "pod" to be scheduled. The
- // minimum set selected is subject to the constraint that a higher-priority pod
- // is never preempted when a lower-priority pod could be (higher/lower relative
- // to one another, not relative to the preemptor "pod").
- // The algorithm first checks if the pod can be scheduled on the node when all the
- // lower priority pods are gone. If so, it sorts all the lower priority pods by
- // their priority and then puts them into two groups of those whose PodDisruptionBudget
- // will be violated if preempted and other non-violating pods. Both groups are
- // sorted by priority. It first tries to reprieve as many PDB violating pods as
- // possible and then does them same for non-PDB-violating pods while checking
- // that the "pod" can still fit on the node.
- // NOTE: This function assumes that it is never called if "pod" cannot be scheduled
- // due to pod affinity, node affinity, or node anti-affinity reasons. None of
- // these predicates can be satisfied by removing more pods from the node.
- func selectVictimsOnNode(
- pod *v1.Pod,
- meta predicates.PredicateMetadata,
- nodeInfo *schedulernodeinfo.NodeInfo,
- fitPredicates map[string]predicates.FitPredicate,
- queue internalqueue.SchedulingQueue,
- pdbs []*policy.PodDisruptionBudget,
- ) ([]*v1.Pod, int, bool) {
- if nodeInfo == nil {
- return nil, 0, false
- }
- potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod}
- nodeInfoCopy := nodeInfo.Clone()
- removePod := func(rp *v1.Pod) {
- nodeInfoCopy.RemovePod(rp)
- if meta != nil {
- meta.RemovePod(rp)
- }
- }
- addPod := func(ap *v1.Pod) {
- nodeInfoCopy.AddPod(ap)
- if meta != nil {
- meta.AddPod(ap, nodeInfoCopy)
- }
- }
- // As the first step, remove all the lower priority pods from the node and
- // check if the given pod can be scheduled.
- podPriority := util.GetPodPriority(pod)
- for _, p := range nodeInfoCopy.Pods() {
- if util.GetPodPriority(p) < podPriority {
- potentialVictims.Items = append(potentialVictims.Items, p)
- removePod(p)
- }
- }
- // If the new pod does not fit after removing all the lower priority pods,
- // we are almost done and this node is not suitable for preemption. The only
- // condition that we could check is if the "pod" is failing to schedule due to
- // inter-pod affinity to one or more victims, but we have decided not to
- // support this case for performance reasons. Having affinity to lower
- // priority pods is not a recommended configuration anyway.
- if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
- if err != nil {
- klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
- }
- return nil, 0, false
- }
- var victims []*v1.Pod
- numViolatingVictim := 0
- potentialVictims.Sort()
- // Try to reprieve as many pods as possible. We first try to reprieve the PDB
- // violating victims and then other non-violating ones. In both cases, we start
- // from the highest priority victims.
- violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
- reprievePod := func(p *v1.Pod) bool {
- addPod(p)
- fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, queue, false)
- if !fits {
- removePod(p)
- victims = append(victims, p)
- klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
- }
- return fits
- }
- for _, p := range violatingVictims {
- if !reprievePod(p) {
- numViolatingVictim++
- }
- }
- // Now we try to reprieve non-violating victims.
- for _, p := range nonViolatingVictims {
- reprievePod(p)
- }
- return victims, numViolatingVictim, true
- }
- // unresolvablePredicateExists checks whether failedPredicates has unresolvable predicate.
- func unresolvablePredicateExists(failedPredicates []predicates.PredicateFailureReason) bool {
- for _, failedPredicate := range failedPredicates {
- if _, ok := unresolvablePredicateFailureErrors[failedPredicate]; ok {
- return true
- }
- }
- return false
- }
- // nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
- // that may be satisfied by removing pods from the node.
- func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
- potentialNodes := []*v1.Node{}
- for _, node := range nodes {
- failedPredicates, _ := failedPredicatesMap[node.Name]
- // If we assume that scheduler looks at all nodes and populates the failedPredicateMap
- // (which is the case today), the !found case should never happen, but we'd prefer
- // to rely less on such assumptions in the code when checking does not impose
- // significant overhead.
- // Also, we currently assume all failures returned by extender as resolvable.
- if !unresolvablePredicateExists(failedPredicates) {
- klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
- potentialNodes = append(potentialNodes, node)
- }
- }
- return potentialNodes
- }
- // podEligibleToPreemptOthers determines whether this pod should be considered
- // for preempting other pods or not. If this pod has already preempted other
- // pods and those are in their graceful termination period, it shouldn't be
- // considered for preemption.
- // We look at the node that is nominated for this pod and as long as there are
- // terminating pods on the node, we don't consider this for preempting more pods.
- func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, enableNonPreempting bool) bool {
- if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
- klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
- return false
- }
- nomNodeName := pod.Status.NominatedNodeName
- if len(nomNodeName) > 0 {
- if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
- podPriority := util.GetPodPriority(pod)
- for _, p := range nodeInfo.Pods() {
- if p.DeletionTimestamp != nil && util.GetPodPriority(p) < podPriority {
- // There is a terminating pod on the nominated node.
- return false
- }
- }
- }
- }
- return true
- }
- // podPassesBasicChecks makes sanity checks on the pod if it can be scheduled.
- func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error {
- // Check PVCs used by the pod
- namespace := pod.Namespace
- manifest := &(pod.Spec)
- for i := range manifest.Volumes {
- volume := &manifest.Volumes[i]
- if volume.PersistentVolumeClaim == nil {
- // Volume is not a PVC, ignore
- continue
- }
- pvcName := volume.PersistentVolumeClaim.ClaimName
- pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
- if err != nil {
- // The error has already enough context ("persistentvolumeclaim "myclaim" not found")
- return err
- }
- if pvc.DeletionTimestamp != nil {
- return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
- }
- }
- return nil
- }
- // NewGenericScheduler creates a genericScheduler object.
- func NewGenericScheduler(
- cache internalcache.Cache,
- podQueue internalqueue.SchedulingQueue,
- predicates map[string]predicates.FitPredicate,
- predicateMetaProducer predicates.PredicateMetadataProducer,
- prioritizers []priorities.PriorityConfig,
- priorityMetaProducer priorities.PriorityMetadataProducer,
- framework framework.Framework,
- extenders []algorithm.SchedulerExtender,
- volumeBinder *volumebinder.VolumeBinder,
- pvcLister corelisters.PersistentVolumeClaimLister,
- pdbLister algorithm.PDBLister,
- alwaysCheckAllPredicates bool,
- disablePreemption bool,
- percentageOfNodesToScore int32,
- enableNonPreempting bool,
- ) ScheduleAlgorithm {
- return &genericScheduler{
- cache: cache,
- schedulingQueue: podQueue,
- predicates: predicates,
- predicateMetaProducer: predicateMetaProducer,
- prioritizers: prioritizers,
- priorityMetaProducer: priorityMetaProducer,
- framework: framework,
- extenders: extenders,
- nodeInfoSnapshot: framework.NodeInfoSnapshot(),
- volumeBinder: volumeBinder,
- pvcLister: pvcLister,
- pdbLister: pdbLister,
- alwaysCheckAllPredicates: alwaysCheckAllPredicates,
- disablePreemption: disablePreemption,
- percentageOfNodesToScore: percentageOfNodesToScore,
- enableNonPreempting: enableNonPreempting,
- }
- }
|