12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package core
- import (
- "context"
- "fmt"
- "math"
- "math/rand"
- "sort"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "k8s.io/klog"
- v1 "k8s.io/api/core/v1"
- policy "k8s.io/api/policy/v1beta1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- corelisters "k8s.io/client-go/listers/core/v1"
- policylisters "k8s.io/client-go/listers/policy/v1beta1"
- "k8s.io/client-go/util/workqueue"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
- framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
- internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
- internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
- "k8s.io/kubernetes/pkg/scheduler/listers"
- "k8s.io/kubernetes/pkg/scheduler/metrics"
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- "k8s.io/kubernetes/pkg/scheduler/util"
- "k8s.io/kubernetes/pkg/scheduler/volumebinder"
- utiltrace "k8s.io/utils/trace"
- )
- const (
- // minFeasibleNodesToFind is the minimum number of nodes that would be scored
- // in each scheduling cycle. This is a semi-arbitrary value to ensure that a
- // certain minimum of nodes are checked for feasibility. This in turn helps
- // ensure a minimum level of spreading.
- minFeasibleNodesToFind = 100
- // minFeasibleNodesPercentageToFind is the minimum percentage of nodes that
- // would be scored in each scheduling cycle. This is a semi-arbitrary value
- // to ensure that a certain minimum of nodes are checked for feasibility.
- // This in turn helps ensure a minimum level of spreading.
- minFeasibleNodesPercentageToFind = 5
- )
- // FitError describes a fit error of a pod.
- type FitError struct {
- Pod *v1.Pod
- NumAllNodes int
- FilteredNodesStatuses framework.NodeToStatusMap
- }
- // ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
- var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
- const (
- // NoNodeAvailableMsg is used to format message when no nodes available.
- NoNodeAvailableMsg = "0/%v nodes are available"
- )
- // Error returns detailed information of why the pod failed to fit on each node
- func (f *FitError) Error() string {
- reasons := make(map[string]int)
- for _, status := range f.FilteredNodesStatuses {
- for _, reason := range status.Reasons() {
- reasons[reason]++
- }
- }
- sortReasonsHistogram := func() []string {
- var reasonStrings []string
- for k, v := range reasons {
- reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
- }
- sort.Strings(reasonStrings)
- return reasonStrings
- }
- reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+": %v.", f.NumAllNodes, strings.Join(sortReasonsHistogram(), ", "))
- return reasonMsg
- }
- // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
- // onto machines.
- // TODO: Rename this type.
- type ScheduleAlgorithm interface {
- Schedule(context.Context, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
- // Preempt receives scheduling errors for a pod and tries to create room for
- // the pod by preempting lower priority pods if possible.
- // It returns the node where preemption happened, a list of preempted pods, a
- // list of pods whose nominated node name should be removed, and error if any.
- Preempt(context.Context, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
- // Prioritizers returns a slice of priority config. This is exposed for
- // testing.
- Extenders() []SchedulerExtender
- // Snapshot snapshots scheduler cache and node infos. This is needed
- // for cluster autoscaler integration.
- // TODO(#85691): remove this once CA migrates to creating a Framework instead of a full scheduler.
- Snapshot() error
- // Framework returns the scheduler framework instance. This is needed for cluster autoscaler integration.
- // TODO(#85691): remove this once CA migrates to creating a Framework instead of a full scheduler.
- Framework() framework.Framework
- }
- // ScheduleResult represents the result of one pod scheduled. It will contain
- // the final selected Node, along with the selected intermediate information.
- type ScheduleResult struct {
- // Name of the scheduler suggest host
- SuggestedHost string
- // Number of nodes scheduler evaluated on one pod scheduled
- EvaluatedNodes int
- // Number of feasible nodes on one pod scheduled
- FeasibleNodes int
- }
- type genericScheduler struct {
- cache internalcache.Cache
- schedulingQueue internalqueue.SchedulingQueue
- framework framework.Framework
- extenders []SchedulerExtender
- nodeInfoSnapshot *internalcache.Snapshot
- volumeBinder *volumebinder.VolumeBinder
- pvcLister corelisters.PersistentVolumeClaimLister
- pdbLister policylisters.PodDisruptionBudgetLister
- disablePreemption bool
- percentageOfNodesToScore int32
- enableNonPreempting bool
- nextStartNodeIndex int
- }
- // Snapshot snapshots scheduler cache and node infos for all fit and priority
- // functions.
- func (g *genericScheduler) Snapshot() error {
- // Used for all fit and priority funcs.
- return g.cache.UpdateSnapshot(g.nodeInfoSnapshot)
- }
- // Framework returns the framework instance.
- func (g *genericScheduler) Framework() framework.Framework {
- // Used for all fit and priority funcs.
- return g.framework
- }
- // Schedule tries to schedule the given pod to one of the nodes in the node list.
- // If it succeeds, it will return the name of the node.
- // If it fails, it will return a FitError error with reasons.
- func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
- trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
- defer trace.LogIfLong(100 * time.Millisecond)
- if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
- return result, err
- }
- trace.Step("Basic checks done")
- if err := g.Snapshot(); err != nil {
- return result, err
- }
- trace.Step("Snapshotting scheduler cache and node infos done")
- if g.nodeInfoSnapshot.NumNodes() == 0 {
- return result, ErrNoNodesAvailable
- }
- // Run "prefilter" plugins.
- preFilterStatus := g.framework.RunPreFilterPlugins(ctx, state, pod)
- if !preFilterStatus.IsSuccess() {
- return result, preFilterStatus.AsError()
- }
- trace.Step("Running prefilter plugins done")
- startPredicateEvalTime := time.Now()
- filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, state, pod)
- if err != nil {
- return result, err
- }
- trace.Step("Computing predicates done")
- if len(filteredNodes) == 0 {
- return result, &FitError{
- Pod: pod,
- NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
- FilteredNodesStatuses: filteredNodesStatuses,
- }
- }
- // Run "prescore" plugins.
- prescoreStatus := g.framework.RunPreScorePlugins(ctx, state, pod, filteredNodes)
- if !prescoreStatus.IsSuccess() {
- return result, prescoreStatus.AsError()
- }
- trace.Step("Running prescore plugins done")
- metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime))
- metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
- startPriorityEvalTime := time.Now()
- // When only one node after predicate, just use it.
- if len(filteredNodes) == 1 {
- metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
- return ScheduleResult{
- SuggestedHost: filteredNodes[0].Name,
- EvaluatedNodes: 1 + len(filteredNodesStatuses),
- FeasibleNodes: 1,
- }, nil
- }
- priorityList, err := g.prioritizeNodes(ctx, state, pod, filteredNodes)
- if err != nil {
- return result, err
- }
- metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
- metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
- host, err := g.selectHost(priorityList)
- trace.Step("Prioritizing done")
- return ScheduleResult{
- SuggestedHost: host,
- EvaluatedNodes: len(filteredNodes) + len(filteredNodesStatuses),
- FeasibleNodes: len(filteredNodes),
- }, err
- }
- func (g *genericScheduler) Extenders() []SchedulerExtender {
- return g.extenders
- }
- // selectHost takes a prioritized list of nodes and then picks one
- // in a reservoir sampling manner from the nodes that had the highest score.
- func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
- if len(nodeScoreList) == 0 {
- return "", fmt.Errorf("empty priorityList")
- }
- maxScore := nodeScoreList[0].Score
- selected := nodeScoreList[0].Name
- cntOfMaxScore := 1
- for _, ns := range nodeScoreList[1:] {
- if ns.Score > maxScore {
- maxScore = ns.Score
- selected = ns.Name
- cntOfMaxScore = 1
- } else if ns.Score == maxScore {
- cntOfMaxScore++
- if rand.Intn(cntOfMaxScore) == 0 {
- // Replace the candidate with probability of 1/cntOfMaxScore
- selected = ns.Name
- }
- }
- }
- return selected, nil
- }
- // preempt finds nodes with pods that can be preempted to make room for "pod" to
- // schedule. It chooses one of the nodes and preempts the pods on the node and
- // returns 1) the node, 2) the list of preempted pods if such a node is found,
- // 3) A list of pods whose nominated node name should be cleared, and 4) any
- // possible error.
- // Preempt does not update its snapshot. It uses the same snapshot used in the
- // scheduling cycle. This is to avoid a scenario where preempt finds feasible
- // nodes without preempting any pod. When there are many pending pods in the
- // scheduling queue a nominated pod will go back to the queue and behind
- // other pods with the same priority. The nominated pod prevents other pods from
- // using the nominated resources and the nominated pod could take a long time
- // before it is retried after many other pending pods.
- func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
- // Scheduler may return various types of errors. Consider preemption only if
- // the error is of type FitError.
- fitError, ok := scheduleErr.(*FitError)
- if !ok || fitError == nil {
- return nil, nil, nil, nil
- }
- if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos(), g.enableNonPreempting) {
- klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
- return nil, nil, nil, nil
- }
- allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
- if err != nil {
- return nil, nil, nil, err
- }
- if len(allNodes) == 0 {
- return nil, nil, nil, ErrNoNodesAvailable
- }
- potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
- if len(potentialNodes) == 0 {
- klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
- // In this case, we should clean-up any existing nominated node name of the pod.
- return nil, nil, []*v1.Pod{pod}, nil
- }
- var pdbs []*policy.PodDisruptionBudget
- if g.pdbLister != nil {
- pdbs, err = g.pdbLister.List(labels.Everything())
- if err != nil {
- return nil, nil, nil, err
- }
- }
- nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, potentialNodes, pdbs)
- if err != nil {
- return nil, nil, nil, err
- }
- // We will only check nodeToVictims with extenders that support preemption.
- // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
- // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
- nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
- if err != nil {
- return nil, nil, nil, err
- }
- candidateNode := pickOneNodeForPreemption(nodeToVictims)
- if candidateNode == nil {
- return nil, nil, nil, nil
- }
- // Lower priority pods nominated to run on this node, may no longer fit on
- // this node. So, we should remove their nomination. Removing their
- // nomination updates these pods and moves them to the active queue. It
- // lets scheduler find another place for them.
- nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
- return candidateNode, nodeToVictims[candidateNode].Pods, nominatedPods, nil
- }
- // processPreemptionWithExtenders processes preemption with extenders
- func (g *genericScheduler) processPreemptionWithExtenders(
- pod *v1.Pod,
- nodeToVictims map[*v1.Node]*extenderv1.Victims,
- ) (map[*v1.Node]*extenderv1.Victims, error) {
- if len(nodeToVictims) > 0 {
- for _, extender := range g.extenders {
- if extender.SupportsPreemption() && extender.IsInterested(pod) {
- newNodeToVictims, err := extender.ProcessPreemption(
- pod,
- nodeToVictims,
- g.nodeInfoSnapshot.NodeInfos(),
- )
- if err != nil {
- if extender.IsIgnorable() {
- klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
- extender, err)
- continue
- }
- return nil, err
- }
- // Replace nodeToVictims with new result after preemption. So the
- // rest of extenders can continue use it as parameter.
- nodeToVictims = newNodeToVictims
- // If node list becomes empty, no preemption can happen regardless of other extenders.
- if len(nodeToVictims) == 0 {
- break
- }
- }
- }
- }
- return nodeToVictims, nil
- }
- // getLowerPriorityNominatedPods returns pods whose priority is smaller than the
- // priority of the given "pod" and are nominated to run on the given node.
- // Note: We could possibly check if the nominated lower priority pods still fit
- // and return those that no longer fit, but that would require lots of
- // manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
- // worth the complexity, especially because we generally expect to have a very
- // small number of nominated pods per node.
- func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
- pods := g.schedulingQueue.NominatedPodsForNode(nodeName)
- if len(pods) == 0 {
- return nil
- }
- var lowerPriorityPods []*v1.Pod
- podPriority := podutil.GetPodPriority(pod)
- for _, p := range pods {
- if podutil.GetPodPriority(p) < podPriority {
- lowerPriorityPods = append(lowerPriorityPods, p)
- }
- }
- return lowerPriorityPods
- }
- // numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
- // its search for more feasible nodes.
- func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
- if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
- return numAllNodes
- }
- adaptivePercentage := g.percentageOfNodesToScore
- if adaptivePercentage <= 0 {
- basePercentageOfNodesToScore := int32(50)
- adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
- if adaptivePercentage < minFeasibleNodesPercentageToFind {
- adaptivePercentage = minFeasibleNodesPercentageToFind
- }
- }
- numNodes = numAllNodes * adaptivePercentage / 100
- if numNodes < minFeasibleNodesToFind {
- return minFeasibleNodesToFind
- }
- return numNodes
- }
- // Filters the nodes to find the ones that fit the pod based on the framework
- // filter plugins and filter extenders.
- func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
- filteredNodesStatuses := make(framework.NodeToStatusMap)
- filtered, err := g.findNodesThatPassFilters(ctx, state, pod, filteredNodesStatuses)
- if err != nil {
- return nil, nil, err
- }
- filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses)
- if err != nil {
- return nil, nil, err
- }
- return filtered, filteredNodesStatuses, nil
- }
- // findNodesThatPassFilters finds the nodes that fit the filter plugins.
- func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
- allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
- if err != nil {
- return nil, err
- }
- numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
- // Create filtered list with enough space to avoid growing it
- // and allow assigning.
- filtered := make([]*v1.Node, numNodesToFind)
- if !g.framework.HasFilterPlugins() {
- for i := range filtered {
- filtered[i] = allNodes[i].Node()
- }
- g.nextStartNodeIndex = (g.nextStartNodeIndex + len(filtered)) % len(allNodes)
- return filtered, nil
- }
- errCh := util.NewErrorChannel()
- var statusesLock sync.Mutex
- var filteredLen int32
- ctx, cancel := context.WithCancel(ctx)
- checkNode := func(i int) {
- // We check the nodes starting from where we left off in the previous scheduling cycle,
- // this is to make sure all nodes have the same chance of being examined across pods.
- nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
- fits, status, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo)
- if err != nil {
- errCh.SendErrorWithCancel(err, cancel)
- return
- }
- if fits {
- length := atomic.AddInt32(&filteredLen, 1)
- if length > numNodesToFind {
- cancel()
- atomic.AddInt32(&filteredLen, -1)
- } else {
- filtered[length-1] = nodeInfo.Node()
- }
- } else {
- statusesLock.Lock()
- if !status.IsSuccess() {
- statuses[nodeInfo.Node().Name] = status
- }
- statusesLock.Unlock()
- }
- }
- beginCheckNode := time.Now()
- statusCode := framework.Success
- defer func() {
- // We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
- // function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
- // Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
- metrics.FrameworkExtensionPointDuration.WithLabelValues(framework.Filter, statusCode.String()).Observe(metrics.SinceInSeconds(beginCheckNode))
- }()
- // Stops searching for more nodes once the configured number of feasible nodes
- // are found.
- workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode)
- processedNodes := int(filteredLen) + len(statuses)
- g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
- filtered = filtered[:filteredLen]
- if err := errCh.ReceiveError(); err != nil {
- statusCode = framework.Error
- return nil, err
- }
- return filtered, nil
- }
- func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
- for _, extender := range g.extenders {
- if len(filtered) == 0 {
- break
- }
- if !extender.IsInterested(pod) {
- continue
- }
- filteredList, failedMap, err := extender.Filter(pod, filtered)
- if err != nil {
- if extender.IsIgnorable() {
- klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
- extender, err)
- continue
- }
- return nil, err
- }
- for failedNodeName, failedMsg := range failedMap {
- if _, found := statuses[failedNodeName]; !found {
- statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
- } else {
- statuses[failedNodeName].AppendReason(failedMsg)
- }
- }
- filtered = filteredList
- }
- return filtered, nil
- }
- // addNominatedPods adds pods with equal or greater priority which are nominated
- // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
- // 3) augmented nodeInfo.
- func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, state *framework.CycleState,
- nodeInfo *schedulernodeinfo.NodeInfo) (bool, *framework.CycleState, *schedulernodeinfo.NodeInfo, error) {
- if g.schedulingQueue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
- // This may happen only in tests.
- return false, state, nodeInfo, nil
- }
- nominatedPods := g.schedulingQueue.NominatedPodsForNode(nodeInfo.Node().Name)
- if len(nominatedPods) == 0 {
- return false, state, nodeInfo, nil
- }
- nodeInfoOut := nodeInfo.Clone()
- stateOut := state.Clone()
- podsAdded := false
- for _, p := range nominatedPods {
- if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
- nodeInfoOut.AddPod(p)
- status := g.framework.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
- if !status.IsSuccess() {
- return false, state, nodeInfo, status.AsError()
- }
- podsAdded = true
- }
- }
- return podsAdded, stateOut, nodeInfoOut, nil
- }
- // podPassesFiltersOnNode checks whether a node given by NodeInfo satisfies the
- // filter plugins.
- // This function is called from two different places: Schedule and Preempt.
- // When it is called from Schedule, we want to test whether the pod is
- // schedulable on the node with all the existing pods on the node plus higher
- // and equal priority pods nominated to run on the node.
- // When it is called from Preempt, we should remove the victims of preemption
- // and add the nominated pods. Removal of the victims is done by
- // SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
- // NodeInfo before calling this function.
- func (g *genericScheduler) podPassesFiltersOnNode(
- ctx context.Context,
- state *framework.CycleState,
- pod *v1.Pod,
- info *schedulernodeinfo.NodeInfo,
- ) (bool, *framework.Status, error) {
- var status *framework.Status
- podsAdded := false
- // We run filters twice in some cases. If the node has greater or equal priority
- // nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
- // If all filters succeed in this pass, we run them again when these
- // nominated pods are not added. This second pass is necessary because some
- // filters such as inter-pod affinity may not pass without the nominated pods.
- // If there are no nominated pods for the node or if the first run of the
- // filters fail, we don't run the second pass.
- // We consider only equal or higher priority pods in the first pass, because
- // those are the current "pod" must yield to them and not take a space opened
- // for running them. It is ok if the current "pod" take resources freed for
- // lower priority pods.
- // Requiring that the new pod is schedulable in both circumstances ensures that
- // we are making a conservative decision: filters like resources and inter-pod
- // anti-affinity are more likely to fail when the nominated pods are treated
- // as running, while filters like pod affinity are more likely to fail when
- // the nominated pods are treated as not running. We can't just assume the
- // nominated pods are running because they are not running right now and in fact,
- // they may end up getting scheduled to a different node.
- for i := 0; i < 2; i++ {
- stateToUse := state
- nodeInfoToUse := info
- if i == 0 {
- var err error
- podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, state, info)
- if err != nil {
- return false, nil, err
- }
- } else if !podsAdded || !status.IsSuccess() {
- break
- }
- statusMap := g.framework.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
- status = statusMap.Merge()
- if !status.IsSuccess() && !status.IsUnschedulable() {
- return false, status, status.AsError()
- }
- }
- return status.IsSuccess(), status, nil
- }
- // prioritizeNodes prioritizes the nodes by running the score plugins,
- // which return a score for each node from the call to RunScorePlugins().
- // The scores from each plugin are added together to make the score for that node, then
- // any extenders are run as well.
- // All scores are finally combined (added) to get the total weighted scores of all nodes
- func (g *genericScheduler) prioritizeNodes(
- ctx context.Context,
- state *framework.CycleState,
- pod *v1.Pod,
- nodes []*v1.Node,
- ) (framework.NodeScoreList, error) {
- // If no priority configs are provided, then all nodes will have a score of one.
- // This is required to generate the priority list in the required format
- if len(g.extenders) == 0 && !g.framework.HasScorePlugins() {
- result := make(framework.NodeScoreList, 0, len(nodes))
- for i := range nodes {
- result = append(result, framework.NodeScore{
- Name: nodes[i].Name,
- Score: 1,
- })
- }
- return result, nil
- }
- // Run the Score plugins.
- scoresMap, scoreStatus := g.framework.RunScorePlugins(ctx, state, pod, nodes)
- if !scoreStatus.IsSuccess() {
- return framework.NodeScoreList{}, scoreStatus.AsError()
- }
- // Summarize all scores.
- result := make(framework.NodeScoreList, 0, len(nodes))
- for i := range nodes {
- result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
- for j := range scoresMap {
- result[i].Score += scoresMap[j][i].Score
- }
- }
- if len(g.extenders) != 0 && nodes != nil {
- var mu sync.Mutex
- var wg sync.WaitGroup
- combinedScores := make(map[string]int64, len(nodes))
- for i := range g.extenders {
- if !g.extenders[i].IsInterested(pod) {
- continue
- }
- wg.Add(1)
- go func(extIndex int) {
- metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Inc()
- defer func() {
- metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec()
- wg.Done()
- }()
- prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
- if err != nil {
- // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
- return
- }
- mu.Lock()
- for i := range *prioritizedList {
- host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
- if klog.V(10) {
- klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, g.extenders[extIndex].Name(), score)
- }
- combinedScores[host] += score * weight
- }
- mu.Unlock()
- }(i)
- }
- // wait for all go routines to finish
- wg.Wait()
- for i := range result {
- // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
- // therefore we need to scale the score returned by extenders to the score range used by the scheduler.
- result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
- }
- }
- if klog.V(10) {
- for i := range result {
- klog.Infof("Host %s => Score %d", result[i].Name, result[i].Score)
- }
- }
- return result, nil
- }
- // pickOneNodeForPreemption chooses one node among the given nodes. It assumes
- // pods in each map entry are ordered by decreasing priority.
- // It picks a node based on the following criteria:
- // 1. A node with minimum number of PDB violations.
- // 2. A node with minimum highest priority victim is picked.
- // 3. Ties are broken by sum of priorities of all victims.
- // 4. If there are still ties, node with the minimum number of victims is picked.
- // 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
- // 6. If there are still ties, the first such node is picked (sort of randomly).
- // The 'minNodes1' and 'minNodes2' are being reused here to save the memory
- // allocation and garbage collection time.
- func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *v1.Node {
- if len(nodesToVictims) == 0 {
- return nil
- }
- minNumPDBViolatingPods := int64(math.MaxInt32)
- var minNodes1 []*v1.Node
- lenNodes1 := 0
- for node, victims := range nodesToVictims {
- if len(victims.Pods) == 0 {
- // We found a node that doesn't need any preemption. Return it!
- // This should happen rarely when one or more pods are terminated between
- // the time that scheduler tries to schedule the pod and the time that
- // preemption logic tries to find nodes for preemption.
- return node
- }
- numPDBViolatingPods := victims.NumPDBViolations
- if numPDBViolatingPods < minNumPDBViolatingPods {
- minNumPDBViolatingPods = numPDBViolatingPods
- minNodes1 = nil
- lenNodes1 = 0
- }
- if numPDBViolatingPods == minNumPDBViolatingPods {
- minNodes1 = append(minNodes1, node)
- lenNodes1++
- }
- }
- if lenNodes1 == 1 {
- return minNodes1[0]
- }
- // There are more than one node with minimum number PDB violating pods. Find
- // the one with minimum highest priority victim.
- minHighestPriority := int32(math.MaxInt32)
- var minNodes2 = make([]*v1.Node, lenNodes1)
- lenNodes2 := 0
- for i := 0; i < lenNodes1; i++ {
- node := minNodes1[i]
- victims := nodesToVictims[node]
- // highestPodPriority is the highest priority among the victims on this node.
- highestPodPriority := podutil.GetPodPriority(victims.Pods[0])
- if highestPodPriority < minHighestPriority {
- minHighestPriority = highestPodPriority
- lenNodes2 = 0
- }
- if highestPodPriority == minHighestPriority {
- minNodes2[lenNodes2] = node
- lenNodes2++
- }
- }
- if lenNodes2 == 1 {
- return minNodes2[0]
- }
- // There are a few nodes with minimum highest priority victim. Find the
- // smallest sum of priorities.
- minSumPriorities := int64(math.MaxInt64)
- lenNodes1 = 0
- for i := 0; i < lenNodes2; i++ {
- var sumPriorities int64
- node := minNodes2[i]
- for _, pod := range nodesToVictims[node].Pods {
- // We add MaxInt32+1 to all priorities to make all of them >= 0. This is
- // needed so that a node with a few pods with negative priority is not
- // picked over a node with a smaller number of pods with the same negative
- // priority (and similar scenarios).
- sumPriorities += int64(podutil.GetPodPriority(pod)) + int64(math.MaxInt32+1)
- }
- if sumPriorities < minSumPriorities {
- minSumPriorities = sumPriorities
- lenNodes1 = 0
- }
- if sumPriorities == minSumPriorities {
- minNodes1[lenNodes1] = node
- lenNodes1++
- }
- }
- if lenNodes1 == 1 {
- return minNodes1[0]
- }
- // There are a few nodes with minimum highest priority victim and sum of priorities.
- // Find one with the minimum number of pods.
- minNumPods := math.MaxInt32
- lenNodes2 = 0
- for i := 0; i < lenNodes1; i++ {
- node := minNodes1[i]
- numPods := len(nodesToVictims[node].Pods)
- if numPods < minNumPods {
- minNumPods = numPods
- lenNodes2 = 0
- }
- if numPods == minNumPods {
- minNodes2[lenNodes2] = node
- lenNodes2++
- }
- }
- if lenNodes2 == 1 {
- return minNodes2[0]
- }
- // There are a few nodes with same number of pods.
- // Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))
- latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
- if latestStartTime == nil {
- // If the earliest start time of all pods on the 1st node is nil, just return it,
- // which is not expected to happen.
- klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
- return minNodes2[0]
- }
- nodeToReturn := minNodes2[0]
- for i := 1; i < lenNodes2; i++ {
- node := minNodes2[i]
- // Get earliest start time of all pods on the current node.
- earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
- if earliestStartTimeOnNode == nil {
- klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
- continue
- }
- if earliestStartTimeOnNode.After(latestStartTime.Time) {
- latestStartTime = earliestStartTimeOnNode
- nodeToReturn = node
- }
- }
- return nodeToReturn
- }
- // selectNodesForPreemption finds all the nodes with possible victims for
- // preemption in parallel.
- func (g *genericScheduler) selectNodesForPreemption(
- ctx context.Context,
- state *framework.CycleState,
- pod *v1.Pod,
- potentialNodes []*schedulernodeinfo.NodeInfo,
- pdbs []*policy.PodDisruptionBudget,
- ) (map[*v1.Node]*extenderv1.Victims, error) {
- nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
- var resultLock sync.Mutex
- checkNode := func(i int) {
- nodeInfoCopy := potentialNodes[i].Clone()
- stateCopy := state.Clone()
- pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
- if fits {
- resultLock.Lock()
- victims := extenderv1.Victims{
- Pods: pods,
- NumPDBViolations: int64(numPDBViolations),
- }
- nodeToVictims[potentialNodes[i].Node()] = &victims
- resultLock.Unlock()
- }
- }
- workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
- return nodeToVictims, nil
- }
- // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
- // and "nonViolatingPods" based on whether their PDBs will be violated if they are
- // preempted.
- // This function is stable and does not change the order of received pods. So, if it
- // receives a sorted list, grouping will preserve the order of the input list.
- func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
- pdbsAllowed := make([]int32, len(pdbs))
- for i, pdb := range pdbs {
- pdbsAllowed[i] = pdb.Status.DisruptionsAllowed
- }
- for _, obj := range pods {
- pod := obj
- pdbForPodIsViolated := false
- // A pod with no labels will not match any PDB. So, no need to check.
- if len(pod.Labels) != 0 {
- for i, pdb := range pdbs {
- if pdb.Namespace != pod.Namespace {
- continue
- }
- selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
- if err != nil {
- continue
- }
- // A PDB with a nil or empty selector matches nothing.
- if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
- continue
- }
- // We have found a matching PDB.
- if pdbsAllowed[i] <= 0 {
- pdbForPodIsViolated = true
- break
- } else {
- pdbsAllowed[i]--
- }
- }
- }
- if pdbForPodIsViolated {
- violatingPods = append(violatingPods, pod)
- } else {
- nonViolatingPods = append(nonViolatingPods, pod)
- }
- }
- return violatingPods, nonViolatingPods
- }
- // selectVictimsOnNode finds minimum set of pods on the given node that should
- // be preempted in order to make enough room for "pod" to be scheduled. The
- // minimum set selected is subject to the constraint that a higher-priority pod
- // is never preempted when a lower-priority pod could be (higher/lower relative
- // to one another, not relative to the preemptor "pod").
- // The algorithm first checks if the pod can be scheduled on the node when all the
- // lower priority pods are gone. If so, it sorts all the lower priority pods by
- // their priority and then puts them into two groups of those whose PodDisruptionBudget
- // will be violated if preempted and other non-violating pods. Both groups are
- // sorted by priority. It first tries to reprieve as many PDB violating pods as
- // possible and then does them same for non-PDB-violating pods while checking
- // that the "pod" can still fit on the node.
- // NOTE: This function assumes that it is never called if "pod" cannot be scheduled
- // due to pod affinity, node affinity, or node anti-affinity reasons. None of
- // these predicates can be satisfied by removing more pods from the node.
- func (g *genericScheduler) selectVictimsOnNode(
- ctx context.Context,
- state *framework.CycleState,
- pod *v1.Pod,
- nodeInfo *schedulernodeinfo.NodeInfo,
- pdbs []*policy.PodDisruptionBudget,
- ) ([]*v1.Pod, int, bool) {
- var potentialVictims []*v1.Pod
- removePod := func(rp *v1.Pod) error {
- if err := nodeInfo.RemovePod(rp); err != nil {
- return err
- }
- status := g.framework.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
- if !status.IsSuccess() {
- return status.AsError()
- }
- return nil
- }
- addPod := func(ap *v1.Pod) error {
- nodeInfo.AddPod(ap)
- status := g.framework.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
- if !status.IsSuccess() {
- return status.AsError()
- }
- return nil
- }
- // As the first step, remove all the lower priority pods from the node and
- // check if the given pod can be scheduled.
- podPriority := podutil.GetPodPriority(pod)
- for _, p := range nodeInfo.Pods() {
- if podutil.GetPodPriority(p) < podPriority {
- potentialVictims = append(potentialVictims, p)
- if err := removePod(p); err != nil {
- return nil, 0, false
- }
- }
- }
- // If the new pod does not fit after removing all the lower priority pods,
- // we are almost done and this node is not suitable for preemption. The only
- // condition that we could check is if the "pod" is failing to schedule due to
- // inter-pod affinity to one or more victims, but we have decided not to
- // support this case for performance reasons. Having affinity to lower
- // priority pods is not a recommended configuration anyway.
- if fits, _, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo); !fits {
- if err != nil {
- klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
- }
- return nil, 0, false
- }
- var victims []*v1.Pod
- numViolatingVictim := 0
- sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })
- // Try to reprieve as many pods as possible. We first try to reprieve the PDB
- // violating victims and then other non-violating ones. In both cases, we start
- // from the highest priority victims.
- violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
- reprievePod := func(p *v1.Pod) (bool, error) {
- if err := addPod(p); err != nil {
- return false, err
- }
- fits, _, _ := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo)
- if !fits {
- if err := removePod(p); err != nil {
- return false, err
- }
- victims = append(victims, p)
- klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
- }
- return fits, nil
- }
- for _, p := range violatingVictims {
- if fits, err := reprievePod(p); err != nil {
- klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
- return nil, 0, false
- } else if !fits {
- numViolatingVictim++
- }
- }
- // Now we try to reprieve non-violating victims.
- for _, p := range nonViolatingVictims {
- if _, err := reprievePod(p); err != nil {
- klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
- return nil, 0, false
- }
- }
- return victims, numViolatingVictim, true
- }
- // nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
- // that may be satisfied by removing pods from the node.
- func nodesWherePreemptionMightHelp(nodes []*schedulernodeinfo.NodeInfo, fitErr *FitError) []*schedulernodeinfo.NodeInfo {
- var potentialNodes []*schedulernodeinfo.NodeInfo
- for _, node := range nodes {
- name := node.Node().Name
- // We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
- // to determine whether preemption may help or not on the node.
- if fitErr.FilteredNodesStatuses[name].Code() == framework.UnschedulableAndUnresolvable {
- continue
- }
- klog.V(3).Infof("Node %v is a potential node for preemption.", name)
- potentialNodes = append(potentialNodes, node)
- }
- return potentialNodes
- }
- // podEligibleToPreemptOthers determines whether this pod should be considered
- // for preempting other pods or not. If this pod has already preempted other
- // pods and those are in their graceful termination period, it shouldn't be
- // considered for preemption.
- // We look at the node that is nominated for this pod and as long as there are
- // terminating pods on the node, we don't consider this for preempting more pods.
- func podEligibleToPreemptOthers(pod *v1.Pod, nodeInfos listers.NodeInfoLister, enableNonPreempting bool) bool {
- if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
- klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
- return false
- }
- nomNodeName := pod.Status.NominatedNodeName
- if len(nomNodeName) > 0 {
- if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
- podPriority := podutil.GetPodPriority(pod)
- for _, p := range nodeInfo.Pods() {
- if p.DeletionTimestamp != nil && podutil.GetPodPriority(p) < podPriority {
- // There is a terminating pod on the nominated node.
- return false
- }
- }
- }
- }
- return true
- }
- // podPassesBasicChecks makes sanity checks on the pod if it can be scheduled.
- func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error {
- // Check PVCs used by the pod
- namespace := pod.Namespace
- manifest := &(pod.Spec)
- for i := range manifest.Volumes {
- volume := &manifest.Volumes[i]
- if volume.PersistentVolumeClaim == nil {
- // Volume is not a PVC, ignore
- continue
- }
- pvcName := volume.PersistentVolumeClaim.ClaimName
- pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
- if err != nil {
- // The error has already enough context ("persistentvolumeclaim "myclaim" not found")
- return err
- }
- if pvc.DeletionTimestamp != nil {
- return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
- }
- }
- return nil
- }
- // NewGenericScheduler creates a genericScheduler object.
- func NewGenericScheduler(
- cache internalcache.Cache,
- podQueue internalqueue.SchedulingQueue,
- nodeInfoSnapshot *internalcache.Snapshot,
- framework framework.Framework,
- extenders []SchedulerExtender,
- volumeBinder *volumebinder.VolumeBinder,
- pvcLister corelisters.PersistentVolumeClaimLister,
- pdbLister policylisters.PodDisruptionBudgetLister,
- disablePreemption bool,
- percentageOfNodesToScore int32,
- enableNonPreempting bool) ScheduleAlgorithm {
- return &genericScheduler{
- cache: cache,
- schedulingQueue: podQueue,
- framework: framework,
- extenders: extenders,
- nodeInfoSnapshot: nodeInfoSnapshot,
- volumeBinder: volumeBinder,
- pvcLister: pvcLister,
- pdbLister: pdbLister,
- disablePreemption: disablePreemption,
- percentageOfNodesToScore: percentageOfNodesToScore,
- enableNonPreempting: enableNonPreempting,
- }
- }
|