12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // The Controller sets tainted annotations on nodes.
- // Tainted nodes should not be used for new work loads and
- // some effort should be given to getting existing work
- // loads off of tainted nodes.
- package nodelifecycle
- import (
- "context"
- "fmt"
- "strings"
- "sync"
- "time"
- "k8s.io/klog"
- coordv1 "k8s.io/api/coordination/v1"
- v1 "k8s.io/api/core/v1"
- apiequality "k8s.io/apimachinery/pkg/api/equality"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- appsv1informers "k8s.io/client-go/informers/apps/v1"
- coordinformers "k8s.io/client-go/informers/coordination/v1"
- coreinformers "k8s.io/client-go/informers/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- appsv1listers "k8s.io/client-go/listers/apps/v1"
- coordlisters "k8s.io/client-go/listers/coordination/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/client-go/util/flowcontrol"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/component-base/metrics/prometheus/ratelimiter"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
- nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
- kubefeatures "k8s.io/kubernetes/pkg/features"
- kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
- utilnode "k8s.io/kubernetes/pkg/util/node"
- taintutils "k8s.io/kubernetes/pkg/util/taints"
- )
- func init() {
- // Register prometheus metrics
- Register()
- }
- var (
- // UnreachableTaintTemplate is the taint for when a node becomes unreachable.
- UnreachableTaintTemplate = &v1.Taint{
- Key: v1.TaintNodeUnreachable,
- Effect: v1.TaintEffectNoExecute,
- }
- // NotReadyTaintTemplate is the taint for when a node is not ready for
- // executing pods
- NotReadyTaintTemplate = &v1.Taint{
- Key: v1.TaintNodeNotReady,
- Effect: v1.TaintEffectNoExecute,
- }
- // map {NodeConditionType: {ConditionStatus: TaintKey}}
- // represents which NodeConditionType under which ConditionStatus should be
- // tainted with which TaintKey
- // for certain NodeConditionType, there are multiple {ConditionStatus,TaintKey} pairs
- nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
- v1.NodeReady: {
- v1.ConditionFalse: v1.TaintNodeNotReady,
- v1.ConditionUnknown: v1.TaintNodeUnreachable,
- },
- v1.NodeMemoryPressure: {
- v1.ConditionTrue: v1.TaintNodeMemoryPressure,
- },
- v1.NodeDiskPressure: {
- v1.ConditionTrue: v1.TaintNodeDiskPressure,
- },
- v1.NodeNetworkUnavailable: {
- v1.ConditionTrue: v1.TaintNodeNetworkUnavailable,
- },
- v1.NodePIDPressure: {
- v1.ConditionTrue: v1.TaintNodePIDPressure,
- },
- }
- taintKeyToNodeConditionMap = map[string]v1.NodeConditionType{
- v1.TaintNodeNotReady: v1.NodeReady,
- v1.TaintNodeUnreachable: v1.NodeReady,
- v1.TaintNodeNetworkUnavailable: v1.NodeNetworkUnavailable,
- v1.TaintNodeMemoryPressure: v1.NodeMemoryPressure,
- v1.TaintNodeDiskPressure: v1.NodeDiskPressure,
- v1.TaintNodePIDPressure: v1.NodePIDPressure,
- }
- )
- // ZoneState is the state of a given zone.
- type ZoneState string
- const (
- stateInitial = ZoneState("Initial")
- stateNormal = ZoneState("Normal")
- stateFullDisruption = ZoneState("FullDisruption")
- statePartialDisruption = ZoneState("PartialDisruption")
- )
- const (
- // The amount of time the nodecontroller should sleep between retrying node health updates
- retrySleepTime = 20 * time.Millisecond
- nodeNameKeyIndex = "spec.nodeName"
- // podUpdateWorkerSizes assumes that in most cases pod will be handled by monitorNodeHealth pass.
- // Pod update workes will only handle lagging cache pods. 4 workes should be enough.
- podUpdateWorkerSize = 4
- )
- // labelReconcileInfo lists Node labels to reconcile, and how to reconcile them.
- // primaryKey and secondaryKey are keys of labels to reconcile.
- // - If both keys exist, but their values don't match. Use the value from the
- // primaryKey as the source of truth to reconcile.
- // - If ensureSecondaryExists is true, and the secondaryKey does not
- // exist, secondaryKey will be added with the value of the primaryKey.
- var labelReconcileInfo = []struct {
- primaryKey string
- secondaryKey string
- ensureSecondaryExists bool
- }{
- {
- // Reconcile the beta and the stable OS label using the beta label as
- // the source of truth.
- // TODO(#73084): switch to using the stable label as the source of
- // truth in v1.18.
- primaryKey: kubeletapis.LabelOS,
- secondaryKey: v1.LabelOSStable,
- ensureSecondaryExists: true,
- },
- {
- // Reconcile the beta and the stable arch label using the beta label as
- // the source of truth.
- // TODO(#73084): switch to using the stable label as the source of
- // truth in v1.18.
- primaryKey: kubeletapis.LabelArch,
- secondaryKey: v1.LabelArchStable,
- ensureSecondaryExists: true,
- },
- }
- type nodeHealthData struct {
- probeTimestamp metav1.Time
- readyTransitionTimestamp metav1.Time
- status *v1.NodeStatus
- lease *coordv1.Lease
- }
- func (n *nodeHealthData) deepCopy() *nodeHealthData {
- if n == nil {
- return nil
- }
- return &nodeHealthData{
- probeTimestamp: n.probeTimestamp,
- readyTransitionTimestamp: n.readyTransitionTimestamp,
- status: n.status.DeepCopy(),
- lease: n.lease.DeepCopy(),
- }
- }
- type nodeHealthMap struct {
- lock sync.RWMutex
- nodeHealths map[string]*nodeHealthData
- }
- func newNodeHealthMap() *nodeHealthMap {
- return &nodeHealthMap{
- nodeHealths: make(map[string]*nodeHealthData),
- }
- }
- // getDeepCopy - returns copy of node health data.
- // It prevents data being changed after retrieving it from the map.
- func (n *nodeHealthMap) getDeepCopy(name string) *nodeHealthData {
- n.lock.RLock()
- defer n.lock.RUnlock()
- return n.nodeHealths[name].deepCopy()
- }
- func (n *nodeHealthMap) set(name string, data *nodeHealthData) {
- n.lock.Lock()
- defer n.lock.Unlock()
- n.nodeHealths[name] = data
- }
- type podUpdateItem struct {
- namespace string
- name string
- }
- type evictionStatus int
- const (
- unmarked = iota
- toBeEvicted
- evicted
- )
- // nodeEvictionMap stores evictionStatus data for each node.
- type nodeEvictionMap struct {
- lock sync.Mutex
- nodeEvictions map[string]evictionStatus
- }
- func newNodeEvictionMap() *nodeEvictionMap {
- return &nodeEvictionMap{
- nodeEvictions: make(map[string]evictionStatus),
- }
- }
- func (n *nodeEvictionMap) registerNode(nodeName string) {
- n.lock.Lock()
- defer n.lock.Unlock()
- n.nodeEvictions[nodeName] = unmarked
- }
- func (n *nodeEvictionMap) unregisterNode(nodeName string) {
- n.lock.Lock()
- defer n.lock.Unlock()
- delete(n.nodeEvictions, nodeName)
- }
- func (n *nodeEvictionMap) setStatus(nodeName string, status evictionStatus) bool {
- n.lock.Lock()
- defer n.lock.Unlock()
- if _, exists := n.nodeEvictions[nodeName]; !exists {
- return false
- }
- n.nodeEvictions[nodeName] = status
- return true
- }
- func (n *nodeEvictionMap) getStatus(nodeName string) (evictionStatus, bool) {
- n.lock.Lock()
- defer n.lock.Unlock()
- if _, exists := n.nodeEvictions[nodeName]; !exists {
- return unmarked, false
- }
- return n.nodeEvictions[nodeName], true
- }
- // Controller is the controller that manages node's life cycle.
- type Controller struct {
- taintManager *scheduler.NoExecuteTaintManager
- podLister corelisters.PodLister
- podInformerSynced cache.InformerSynced
- kubeClient clientset.Interface
- // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
- // to avoid the problem with time skew across the cluster.
- now func() metav1.Time
- enterPartialDisruptionFunc func(nodeNum int) float32
- enterFullDisruptionFunc func(nodeNum int) float32
- computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
- knownNodeSet map[string]*v1.Node
- // per Node map storing last observed health together with a local time when it was observed.
- nodeHealthMap *nodeHealthMap
- // evictorLock protects zonePodEvictor and zoneNoExecuteTainter.
- // TODO(#83954): API calls shouldn't be executed under the lock.
- evictorLock sync.Mutex
- nodeEvictionMap *nodeEvictionMap
- // workers that evicts pods from unresponsive nodes.
- zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
- // workers that are responsible for tainting nodes.
- zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
- nodesToRetry sync.Map
- zoneStates map[string]ZoneState
- daemonSetStore appsv1listers.DaemonSetLister
- daemonSetInformerSynced cache.InformerSynced
- leaseLister coordlisters.LeaseLister
- leaseInformerSynced cache.InformerSynced
- nodeLister corelisters.NodeLister
- nodeInformerSynced cache.InformerSynced
- getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)
- recorder record.EventRecorder
- // Value controlling Controller monitoring period, i.e. how often does Controller
- // check node health signal posted from kubelet. This value should be lower than
- // nodeMonitorGracePeriod.
- // TODO: Change node health monitor to watch based.
- nodeMonitorPeriod time.Duration
- // When node is just created, e.g. cluster bootstrap or node creation, we give
- // a longer grace period.
- nodeStartupGracePeriod time.Duration
- // Controller will not proactively sync node health, but will monitor node
- // health signal updated from kubelet. There are 2 kinds of node healthiness
- // signals: NodeStatus and NodeLease. NodeLease signal is generated only when
- // NodeLease feature is enabled. If it doesn't receive update for this amount
- // of time, it will start posting "NodeReady==ConditionUnknown". The amount of
- // time before which Controller start evicting pods is controlled via flag
- // 'pod-eviction-timeout'.
- // Note: be cautious when changing the constant, it must work with
- // nodeStatusUpdateFrequency in kubelet and renewInterval in NodeLease
- // controller. The node health signal update frequency is the minimal of the
- // two.
- // There are several constraints:
- // 1. nodeMonitorGracePeriod must be N times more than the node health signal
- // update frequency, where N means number of retries allowed for kubelet to
- // post node status/lease. It is pointless to make nodeMonitorGracePeriod
- // be less than the node health signal update frequency, since there will
- // only be fresh values from Kubelet at an interval of node health signal
- // update frequency. The constant must be less than podEvictionTimeout.
- // 2. nodeMonitorGracePeriod can't be too large for user experience - larger
- // value takes longer for user to see up-to-date node health.
- nodeMonitorGracePeriod time.Duration
- podEvictionTimeout time.Duration
- evictionLimiterQPS float32
- secondaryEvictionLimiterQPS float32
- largeClusterThreshold int32
- unhealthyZoneThreshold float32
- // if set to true Controller will start TaintManager that will evict Pods from
- // tainted nodes, if they're not tolerated.
- runTaintManager bool
- // if set to true Controller will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
- // taints instead of evicting Pods itself.
- useTaintBasedEvictions bool
- nodeUpdateQueue workqueue.Interface
- podUpdateQueue workqueue.RateLimitingInterface
- }
- // NewNodeLifecycleController returns a new taint controller.
- func NewNodeLifecycleController(
- leaseInformer coordinformers.LeaseInformer,
- podInformer coreinformers.PodInformer,
- nodeInformer coreinformers.NodeInformer,
- daemonSetInformer appsv1informers.DaemonSetInformer,
- kubeClient clientset.Interface,
- nodeMonitorPeriod time.Duration,
- nodeStartupGracePeriod time.Duration,
- nodeMonitorGracePeriod time.Duration,
- podEvictionTimeout time.Duration,
- evictionLimiterQPS float32,
- secondaryEvictionLimiterQPS float32,
- largeClusterThreshold int32,
- unhealthyZoneThreshold float32,
- runTaintManager bool,
- useTaintBasedEvictions bool,
- ) (*Controller, error) {
- if kubeClient == nil {
- klog.Fatalf("kubeClient is nil when starting Controller")
- }
- eventBroadcaster := record.NewBroadcaster()
- recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
- eventBroadcaster.StartLogging(klog.Infof)
- klog.Infof("Sending events to api server.")
- eventBroadcaster.StartRecordingToSink(
- &v1core.EventSinkImpl{
- Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""),
- })
- if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
- ratelimiter.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
- }
- nc := &Controller{
- kubeClient: kubeClient,
- now: metav1.Now,
- knownNodeSet: make(map[string]*v1.Node),
- nodeHealthMap: newNodeHealthMap(),
- nodeEvictionMap: newNodeEvictionMap(),
- recorder: recorder,
- nodeMonitorPeriod: nodeMonitorPeriod,
- nodeStartupGracePeriod: nodeStartupGracePeriod,
- nodeMonitorGracePeriod: nodeMonitorGracePeriod,
- zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
- zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue),
- nodesToRetry: sync.Map{},
- zoneStates: make(map[string]ZoneState),
- podEvictionTimeout: podEvictionTimeout,
- evictionLimiterQPS: evictionLimiterQPS,
- secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
- largeClusterThreshold: largeClusterThreshold,
- unhealthyZoneThreshold: unhealthyZoneThreshold,
- runTaintManager: runTaintManager,
- useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
- nodeUpdateQueue: workqueue.NewNamed("node_lifecycle_controller"),
- podUpdateQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_lifecycle_controller_pods"),
- }
- if useTaintBasedEvictions {
- klog.Infof("Controller is using taint based evictions.")
- }
- nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
- nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
- nc.computeZoneStateFunc = nc.ComputeZoneState
- podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- pod := obj.(*v1.Pod)
- nc.podUpdated(nil, pod)
- if nc.taintManager != nil {
- nc.taintManager.PodUpdated(nil, pod)
- }
- },
- UpdateFunc: func(prev, obj interface{}) {
- prevPod := prev.(*v1.Pod)
- newPod := obj.(*v1.Pod)
- nc.podUpdated(prevPod, newPod)
- if nc.taintManager != nil {
- nc.taintManager.PodUpdated(prevPod, newPod)
- }
- },
- DeleteFunc: func(obj interface{}) {
- pod, isPod := obj.(*v1.Pod)
- // We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly.
- if !isPod {
- deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- klog.Errorf("Received unexpected object: %v", obj)
- return
- }
- pod, ok = deletedState.Obj.(*v1.Pod)
- if !ok {
- klog.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj)
- return
- }
- }
- nc.podUpdated(pod, nil)
- if nc.taintManager != nil {
- nc.taintManager.PodUpdated(pod, nil)
- }
- },
- })
- nc.podInformerSynced = podInformer.Informer().HasSynced
- podInformer.Informer().AddIndexers(cache.Indexers{
- nodeNameKeyIndex: func(obj interface{}) ([]string, error) {
- pod, ok := obj.(*v1.Pod)
- if !ok {
- return []string{}, nil
- }
- if len(pod.Spec.NodeName) == 0 {
- return []string{}, nil
- }
- return []string{pod.Spec.NodeName}, nil
- },
- })
- podIndexer := podInformer.Informer().GetIndexer()
- nc.getPodsAssignedToNode = func(nodeName string) ([]*v1.Pod, error) {
- objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName)
- if err != nil {
- return nil, err
- }
- pods := make([]*v1.Pod, 0, len(objs))
- for _, obj := range objs {
- pod, ok := obj.(*v1.Pod)
- if !ok {
- continue
- }
- pods = append(pods, pod)
- }
- return pods, nil
- }
- nc.podLister = podInformer.Lister()
- if nc.runTaintManager {
- podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) }
- nodeLister := nodeInformer.Lister()
- nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
- nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode)
- nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
- nc.taintManager.NodeUpdated(nil, node)
- return nil
- }),
- UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
- nc.taintManager.NodeUpdated(oldNode, newNode)
- return nil
- }),
- DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
- nc.taintManager.NodeUpdated(node, nil)
- return nil
- }),
- })
- }
- klog.Infof("Controller will reconcile labels.")
- nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
- nc.nodeUpdateQueue.Add(node.Name)
- nc.nodeEvictionMap.registerNode(node.Name)
- return nil
- }),
- UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
- nc.nodeUpdateQueue.Add(newNode.Name)
- return nil
- }),
- DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
- nc.nodesToRetry.Delete(node.Name)
- nc.nodeEvictionMap.unregisterNode(node.Name)
- return nil
- }),
- })
- nc.leaseLister = leaseInformer.Lister()
- nc.leaseInformerSynced = leaseInformer.Informer().HasSynced
- nc.nodeLister = nodeInformer.Lister()
- nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
- nc.daemonSetStore = daemonSetInformer.Lister()
- nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
- return nc, nil
- }
- // Run starts an asynchronous loop that monitors the status of cluster nodes.
- func (nc *Controller) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- klog.Infof("Starting node controller")
- defer klog.Infof("Shutting down node controller")
- if !cache.WaitForNamedCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
- return
- }
- if nc.runTaintManager {
- go nc.taintManager.Run(stopCh)
- }
- // Close node update queue to cleanup go routine.
- defer nc.nodeUpdateQueue.ShutDown()
- defer nc.podUpdateQueue.ShutDown()
- // Start workers to reconcile labels and/or update NoSchedule taint for nodes.
- for i := 0; i < scheduler.UpdateWorkerSize; i++ {
- // Thanks to "workqueue", each worker just need to get item from queue, because
- // the item is flagged when got from queue: if new event come, the new item will
- // be re-queued until "Done", so no more than one worker handle the same item and
- // no event missed.
- go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)
- }
- for i := 0; i < podUpdateWorkerSize; i++ {
- go wait.Until(nc.doPodProcessingWorker, time.Second, stopCh)
- }
- if nc.useTaintBasedEvictions {
- // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
- // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
- go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
- } else {
- // Managing eviction of nodes:
- // When we delete pods off a node, if the node was not empty at the time we then
- // queue an eviction watcher. If we hit an error, retry deletion.
- go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
- }
- // Incorporate the results of node health signal pushed from kubelet to master.
- go wait.Until(func() {
- if err := nc.monitorNodeHealth(); err != nil {
- klog.Errorf("Error monitoring node health: %v", err)
- }
- }, nc.nodeMonitorPeriod, stopCh)
- <-stopCh
- }
- func (nc *Controller) doNodeProcessingPassWorker() {
- for {
- obj, shutdown := nc.nodeUpdateQueue.Get()
- // "nodeUpdateQueue" will be shutdown when "stopCh" closed;
- // we do not need to re-check "stopCh" again.
- if shutdown {
- return
- }
- nodeName := obj.(string)
- if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
- klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err)
- // TODO(k82cn): Add nodeName back to the queue
- }
- // TODO: re-evaluate whether there are any labels that need to be
- // reconcile in 1.19. Remove this function if it's no longer necessary.
- if err := nc.reconcileNodeLabels(nodeName); err != nil {
- klog.Errorf("Failed to reconcile labels for node <%s>, requeue it: %v", nodeName, err)
- // TODO(yujuhong): Add nodeName back to the queue
- }
- nc.nodeUpdateQueue.Done(nodeName)
- }
- }
- func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
- node, err := nc.nodeLister.Get(nodeName)
- if err != nil {
- // If node not found, just ignore it.
- if apierrors.IsNotFound(err) {
- return nil
- }
- return err
- }
- // Map node's condition to Taints.
- var taints []v1.Taint
- for _, condition := range node.Status.Conditions {
- if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
- if taintKey, found := taintMap[condition.Status]; found {
- taints = append(taints, v1.Taint{
- Key: taintKey,
- Effect: v1.TaintEffectNoSchedule,
- })
- }
- }
- }
- if node.Spec.Unschedulable {
- // If unschedulable, append related taint.
- taints = append(taints, v1.Taint{
- Key: v1.TaintNodeUnschedulable,
- Effect: v1.TaintEffectNoSchedule,
- })
- }
- // Get exist taints of node.
- nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
- // only NoSchedule taints are candidates to be compared with "taints" later
- if t.Effect != v1.TaintEffectNoSchedule {
- return false
- }
- // Find unschedulable taint of node.
- if t.Key == v1.TaintNodeUnschedulable {
- return true
- }
- // Find node condition taints of node.
- _, found := taintKeyToNodeConditionMap[t.Key]
- return found
- })
- taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
- // If nothing to add not delete, return true directly.
- if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
- return nil
- }
- if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
- return fmt.Errorf("failed to swap taints of node %+v", node)
- }
- return nil
- }
- func (nc *Controller) doNoExecuteTaintingPass() {
- nc.evictorLock.Lock()
- defer nc.evictorLock.Unlock()
- for k := range nc.zoneNoExecuteTainter {
- // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
- nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
- node, err := nc.nodeLister.Get(value.Value)
- if apierrors.IsNotFound(err) {
- klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
- return true, 0
- } else if err != nil {
- klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
- // retry in 50 millisecond
- return false, 50 * time.Millisecond
- }
- _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
- // Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
- taintToAdd := v1.Taint{}
- oppositeTaint := v1.Taint{}
- switch condition.Status {
- case v1.ConditionFalse:
- taintToAdd = *NotReadyTaintTemplate
- oppositeTaint = *UnreachableTaintTemplate
- case v1.ConditionUnknown:
- taintToAdd = *UnreachableTaintTemplate
- oppositeTaint = *NotReadyTaintTemplate
- default:
- // It seems that the Node is ready again, so there's no need to taint it.
- klog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value)
- return true, 0
- }
- result := nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
- if result {
- //count the evictionsNumber
- zone := utilnode.GetZoneKey(node)
- evictionsNumber.WithLabelValues(zone).Inc()
- }
- return result, 0
- })
- }
- }
- func (nc *Controller) doEvictionPass() {
- nc.evictorLock.Lock()
- defer nc.evictorLock.Unlock()
- for k := range nc.zonePodEvictor {
- // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
- nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
- node, err := nc.nodeLister.Get(value.Value)
- if apierrors.IsNotFound(err) {
- klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
- } else if err != nil {
- klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
- }
- nodeUID, _ := value.UID.(string)
- pods, err := nc.getPodsAssignedToNode(value.Value)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("unable to list pods from node %q: %v", value.Value, err))
- return false, 0
- }
- remaining, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
- if err != nil {
- // We are not setting eviction status here.
- // New pods will be handled by zonePodEvictor retry
- // instead of immediate pod eviction.
- utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
- return false, 0
- }
- if !nc.nodeEvictionMap.setStatus(value.Value, evicted) {
- klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", value.Value)
- }
- if remaining {
- klog.Infof("Pods awaiting deletion due to Controller eviction")
- }
- if node != nil {
- zone := utilnode.GetZoneKey(node)
- evictionsNumber.WithLabelValues(zone).Inc()
- }
- return true, 0
- })
- }
- }
- // monitorNodeHealth verifies node health are constantly updated by kubelet, and
- // if not, post "NodeReady==ConditionUnknown".
- // For nodes who are not ready or not reachable for a long period of time.
- // This function will taint them if TaintBasedEvictions feature was enabled.
- // Otherwise, it would evict it directly.
- func (nc *Controller) monitorNodeHealth() error {
- // We are listing nodes from local cache as we can tolerate some small delays
- // comparing to state from etcd and there is eventual consistency anyway.
- nodes, err := nc.nodeLister.List(labels.Everything())
- if err != nil {
- return err
- }
- added, deleted, newZoneRepresentatives := nc.classifyNodes(nodes)
- for i := range newZoneRepresentatives {
- nc.addPodEvictorForNewZone(newZoneRepresentatives[i])
- }
- for i := range added {
- klog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name)
- nodeutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
- nc.knownNodeSet[added[i].Name] = added[i]
- nc.addPodEvictorForNewZone(added[i])
- if nc.useTaintBasedEvictions {
- nc.markNodeAsReachable(added[i])
- } else {
- nc.cancelPodEviction(added[i])
- }
- }
- for i := range deleted {
- klog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name)
- nodeutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
- delete(nc.knownNodeSet, deleted[i].Name)
- }
- zoneToNodeConditions := map[string][]*v1.NodeCondition{}
- for i := range nodes {
- var gracePeriod time.Duration
- var observedReadyCondition v1.NodeCondition
- var currentReadyCondition *v1.NodeCondition
- node := nodes[i].DeepCopy()
- if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
- gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(node)
- if err == nil {
- return true, nil
- }
- name := node.Name
- node, err = nc.kubeClient.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
- if err != nil {
- klog.Errorf("Failed while getting a Node to retry updating node health. Probably Node %s was deleted.", name)
- return false, err
- }
- return false, nil
- }); err != nil {
- klog.Errorf("Update health of Node '%v' from Controller error: %v. "+
- "Skipping - no pods will be evicted.", node.Name, err)
- continue
- }
- // Some nodes may be excluded from disruption checking
- if !isNodeExcludedFromDisruptionChecks(node) {
- zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
- }
- if currentReadyCondition != nil {
- pods, err := nc.getPodsAssignedToNode(node.Name)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("unable to list pods of node %v: %v", node.Name, err))
- if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
- // If error happened during node status transition (Ready -> NotReady)
- // we need to mark node for retry to force MarkPodsNotReady execution
- // in the next iteration.
- nc.nodesToRetry.Store(node.Name, struct{}{})
- }
- continue
- }
- if nc.useTaintBasedEvictions {
- nc.processTaintBaseEviction(node, &observedReadyCondition)
- } else {
- if err := nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod, pods); err != nil {
- utilruntime.HandleError(fmt.Errorf("unable to evict all pods from node %v: %v; queuing for retry", node.Name, err))
- }
- }
- _, needsRetry := nc.nodesToRetry.Load(node.Name)
- switch {
- case currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue:
- // Report node event only once when status changed.
- nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
- fallthrough
- case needsRetry && observedReadyCondition.Status != v1.ConditionTrue:
- if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, node.Name); err != nil {
- utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err))
- nc.nodesToRetry.Store(node.Name, struct{}{})
- continue
- }
- }
- }
- nc.nodesToRetry.Delete(node.Name)
- }
- nc.handleDisruption(zoneToNodeConditions, nodes)
- return nil
- }
- func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition) {
- decisionTimestamp := nc.now()
- // Check eviction timeout against decisionTimestamp
- switch observedReadyCondition.Status {
- case v1.ConditionFalse:
- // We want to update the taint straight away if Node is already tainted with the UnreachableTaint
- if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
- taintToAdd := *NotReadyTaintTemplate
- if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
- klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
- }
- } else if nc.markNodeForTainting(node) {
- klog.V(2).Infof("Node %v is NotReady as of %v. Adding it to the Taint queue.",
- node.Name,
- decisionTimestamp,
- )
- }
- case v1.ConditionUnknown:
- // We want to update the taint straight away if Node is already tainted with the UnreachableTaint
- if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
- taintToAdd := *UnreachableTaintTemplate
- if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
- klog.Errorf("Failed to instantly swap NotReadyTaint to UnreachableTaint. Will try again in the next cycle.")
- }
- } else if nc.markNodeForTainting(node) {
- klog.V(2).Infof("Node %v is unresponsive as of %v. Adding it to the Taint queue.",
- node.Name,
- decisionTimestamp,
- )
- }
- case v1.ConditionTrue:
- removed, err := nc.markNodeAsReachable(node)
- if err != nil {
- klog.Errorf("Failed to remove taints from node %v. Will retry in next iteration.", node.Name)
- }
- if removed {
- klog.V(2).Infof("Node %s is healthy again, removing all taints", node.Name)
- }
- }
- }
- func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition, gracePeriod time.Duration, pods []*v1.Pod) error {
- decisionTimestamp := nc.now()
- nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name)
- if nodeHealthData == nil {
- return fmt.Errorf("health data doesn't exist for node %q", node.Name)
- }
- // Check eviction timeout against decisionTimestamp
- switch observedReadyCondition.Status {
- case v1.ConditionFalse:
- if decisionTimestamp.After(nodeHealthData.readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
- enqueued, err := nc.evictPods(node, pods)
- if err != nil {
- return err
- }
- if enqueued {
- klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v",
- node.Name,
- decisionTimestamp,
- nodeHealthData.readyTransitionTimestamp,
- nc.podEvictionTimeout,
- )
- }
- }
- case v1.ConditionUnknown:
- if decisionTimestamp.After(nodeHealthData.probeTimestamp.Add(nc.podEvictionTimeout)) {
- enqueued, err := nc.evictPods(node, pods)
- if err != nil {
- return err
- }
- if enqueued {
- klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v",
- node.Name,
- decisionTimestamp,
- nodeHealthData.readyTransitionTimestamp,
- nc.podEvictionTimeout-gracePeriod,
- )
- }
- }
- case v1.ConditionTrue:
- if nc.cancelPodEviction(node) {
- klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
- }
- }
- return nil
- }
- // labelNodeDisruptionExclusion is a label on nodes that controls whether they are
- // excluded from being considered for disruption checks by the node controller.
- const labelNodeDisruptionExclusion = "node.kubernetes.io/exclude-disruption"
- func isNodeExcludedFromDisruptionChecks(node *v1.Node) bool {
- // DEPRECATED: will be removed in 1.19
- if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LegacyNodeRoleBehavior) {
- if legacyIsMasterNode(node.Name) {
- return true
- }
- }
- if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NodeDisruptionExclusion) {
- if _, ok := node.Labels[labelNodeDisruptionExclusion]; ok {
- return true
- }
- }
- return false
- }
- // legacyIsMasterNode returns true if given node is a registered master according
- // to the logic historically used for this function. This code path is deprecated
- // and the node disruption exclusion label should be used in the future.
- // This code will not be allowed to update to use the node-role label, since
- // node-roles may not be used for feature enablement.
- // DEPRECATED: Will be removed in 1.19
- func legacyIsMasterNode(nodeName string) bool {
- // We are trying to capture "master(-...)?$" regexp.
- // However, using regexp.MatchString() results even in more than 35%
- // of all space allocations in ControllerManager spent in this function.
- // That's why we are trying to be a bit smarter.
- if strings.HasSuffix(nodeName, "master") {
- return true
- }
- if len(nodeName) >= 10 {
- return strings.HasSuffix(nodeName[:len(nodeName)-3], "master-")
- }
- return false
- }
- // tryUpdateNodeHealth checks a given node's conditions and tries to update it. Returns grace period to
- // which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
- func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
- nodeHealth := nc.nodeHealthMap.getDeepCopy(node.Name)
- defer func() {
- nc.nodeHealthMap.set(node.Name, nodeHealth)
- }()
- var gracePeriod time.Duration
- var observedReadyCondition v1.NodeCondition
- _, currentReadyCondition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
- if currentReadyCondition == nil {
- // If ready condition is nil, then kubelet (or nodecontroller) never posted node status.
- // A fake ready condition is created, where LastHeartbeatTime and LastTransitionTime is set
- // to node.CreationTimestamp to avoid handle the corner case.
- observedReadyCondition = v1.NodeCondition{
- Type: v1.NodeReady,
- Status: v1.ConditionUnknown,
- LastHeartbeatTime: node.CreationTimestamp,
- LastTransitionTime: node.CreationTimestamp,
- }
- gracePeriod = nc.nodeStartupGracePeriod
- if nodeHealth != nil {
- nodeHealth.status = &node.Status
- } else {
- nodeHealth = &nodeHealthData{
- status: &node.Status,
- probeTimestamp: node.CreationTimestamp,
- readyTransitionTimestamp: node.CreationTimestamp,
- }
- }
- } else {
- // If ready condition is not nil, make a copy of it, since we may modify it in place later.
- observedReadyCondition = *currentReadyCondition
- gracePeriod = nc.nodeMonitorGracePeriod
- }
- // There are following cases to check:
- // - both saved and new status have no Ready Condition set - we leave everything as it is,
- // - saved status have no Ready Condition, but current one does - Controller was restarted with Node data already present in etcd,
- // - saved status have some Ready Condition, but current one does not - it's an error, but we fill it up because that's probably a good thing to do,
- // - both saved and current statuses have Ready Conditions and they have the same LastProbeTime - nothing happened on that Node, it may be
- // unresponsive, so we leave it as it is,
- // - both saved and current statuses have Ready Conditions, they have different LastProbeTimes, but the same Ready Condition State -
- // everything's in order, no transition occurred, we update only probeTimestamp,
- // - both saved and current statuses have Ready Conditions, different LastProbeTimes and different Ready Condition State -
- // Ready Condition changed it state since we last seen it, so we update both probeTimestamp and readyTransitionTimestamp.
- // TODO: things to consider:
- // - if 'LastProbeTime' have gone back in time its probably an error, currently we ignore it,
- // - currently only correct Ready State transition outside of Node Controller is marking it ready by Kubelet, we don't check
- // if that's the case, but it does not seem necessary.
- var savedCondition *v1.NodeCondition
- var savedLease *coordv1.Lease
- if nodeHealth != nil {
- _, savedCondition = nodeutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
- savedLease = nodeHealth.lease
- }
- if nodeHealth == nil {
- klog.Warningf("Missing timestamp for Node %s. Assuming now as a timestamp.", node.Name)
- nodeHealth = &nodeHealthData{
- status: &node.Status,
- probeTimestamp: nc.now(),
- readyTransitionTimestamp: nc.now(),
- }
- } else if savedCondition == nil && currentReadyCondition != nil {
- klog.V(1).Infof("Creating timestamp entry for newly observed Node %s", node.Name)
- nodeHealth = &nodeHealthData{
- status: &node.Status,
- probeTimestamp: nc.now(),
- readyTransitionTimestamp: nc.now(),
- }
- } else if savedCondition != nil && currentReadyCondition == nil {
- klog.Errorf("ReadyCondition was removed from Status of Node %s", node.Name)
- // TODO: figure out what to do in this case. For now we do the same thing as above.
- nodeHealth = &nodeHealthData{
- status: &node.Status,
- probeTimestamp: nc.now(),
- readyTransitionTimestamp: nc.now(),
- }
- } else if savedCondition != nil && currentReadyCondition != nil && savedCondition.LastHeartbeatTime != currentReadyCondition.LastHeartbeatTime {
- var transitionTime metav1.Time
- // If ReadyCondition changed since the last time we checked, we update the transition timestamp to "now",
- // otherwise we leave it as it is.
- if savedCondition.LastTransitionTime != currentReadyCondition.LastTransitionTime {
- klog.V(3).Infof("ReadyCondition for Node %s transitioned from %v to %v", node.Name, savedCondition, currentReadyCondition)
- transitionTime = nc.now()
- } else {
- transitionTime = nodeHealth.readyTransitionTimestamp
- }
- if klog.V(5) {
- klog.Infof("Node %s ReadyCondition updated. Updating timestamp: %+v vs %+v.", node.Name, nodeHealth.status, node.Status)
- } else {
- klog.V(3).Infof("Node %s ReadyCondition updated. Updating timestamp.", node.Name)
- }
- nodeHealth = &nodeHealthData{
- status: &node.Status,
- probeTimestamp: nc.now(),
- readyTransitionTimestamp: transitionTime,
- }
- }
- // Always update the probe time if node lease is renewed.
- // Note: If kubelet never posted the node status, but continues renewing the
- // heartbeat leases, the node controller will assume the node is healthy and
- // take no action.
- observedLease, _ := nc.leaseLister.Leases(v1.NamespaceNodeLease).Get(node.Name)
- if observedLease != nil && (savedLease == nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {
- nodeHealth.lease = observedLease
- nodeHealth.probeTimestamp = nc.now()
- }
- if nc.now().After(nodeHealth.probeTimestamp.Add(gracePeriod)) {
- // NodeReady condition or lease was last set longer ago than gracePeriod, so
- // update it to Unknown (regardless of its current value) in the master.
- nodeConditionTypes := []v1.NodeConditionType{
- v1.NodeReady,
- v1.NodeMemoryPressure,
- v1.NodeDiskPressure,
- v1.NodePIDPressure,
- // We don't change 'NodeNetworkUnavailable' condition, as it's managed on a control plane level.
- // v1.NodeNetworkUnavailable,
- }
- nowTimestamp := nc.now()
- for _, nodeConditionType := range nodeConditionTypes {
- _, currentCondition := nodeutil.GetNodeCondition(&node.Status, nodeConditionType)
- if currentCondition == nil {
- klog.V(2).Infof("Condition %v of node %v was never updated by kubelet", nodeConditionType, node.Name)
- node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
- Type: nodeConditionType,
- Status: v1.ConditionUnknown,
- Reason: "NodeStatusNeverUpdated",
- Message: "Kubelet never posted node status.",
- LastHeartbeatTime: node.CreationTimestamp,
- LastTransitionTime: nowTimestamp,
- })
- } else {
- klog.V(2).Infof("node %v hasn't been updated for %+v. Last %v is: %+v",
- node.Name, nc.now().Time.Sub(nodeHealth.probeTimestamp.Time), nodeConditionType, currentCondition)
- if currentCondition.Status != v1.ConditionUnknown {
- currentCondition.Status = v1.ConditionUnknown
- currentCondition.Reason = "NodeStatusUnknown"
- currentCondition.Message = "Kubelet stopped posting node status."
- currentCondition.LastTransitionTime = nowTimestamp
- }
- }
- }
- // We need to update currentReadyCondition due to its value potentially changed.
- _, currentReadyCondition = nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
- if !apiequality.Semantic.DeepEqual(currentReadyCondition, &observedReadyCondition) {
- if _, err := nc.kubeClient.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{}); err != nil {
- klog.Errorf("Error updating node %s: %v", node.Name, err)
- return gracePeriod, observedReadyCondition, currentReadyCondition, err
- }
- nodeHealth = &nodeHealthData{
- status: &node.Status,
- probeTimestamp: nodeHealth.probeTimestamp,
- readyTransitionTimestamp: nc.now(),
- lease: observedLease,
- }
- return gracePeriod, observedReadyCondition, currentReadyCondition, nil
- }
- }
- return gracePeriod, observedReadyCondition, currentReadyCondition, nil
- }
- func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
- newZoneStates := map[string]ZoneState{}
- allAreFullyDisrupted := true
- for k, v := range zoneToNodeConditions {
- zoneSize.WithLabelValues(k).Set(float64(len(v)))
- unhealthy, newState := nc.computeZoneStateFunc(v)
- zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
- unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
- if newState != stateFullDisruption {
- allAreFullyDisrupted = false
- }
- newZoneStates[k] = newState
- if _, had := nc.zoneStates[k]; !had {
- klog.Errorf("Setting initial state for unseen zone: %v", k)
- nc.zoneStates[k] = stateInitial
- }
- }
- allWasFullyDisrupted := true
- for k, v := range nc.zoneStates {
- if _, have := zoneToNodeConditions[k]; !have {
- zoneSize.WithLabelValues(k).Set(0)
- zoneHealth.WithLabelValues(k).Set(100)
- unhealthyNodes.WithLabelValues(k).Set(0)
- delete(nc.zoneStates, k)
- continue
- }
- if v != stateFullDisruption {
- allWasFullyDisrupted = false
- break
- }
- }
- // At least one node was responding in previous pass or in the current pass. Semantics is as follows:
- // - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
- // - if the new state is "normal" we resume normal operation (go back to default limiter settings),
- // - if new state is "fullDisruption" we restore normal eviction rate,
- // - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
- if !allAreFullyDisrupted || !allWasFullyDisrupted {
- // We're switching to full disruption mode
- if allAreFullyDisrupted {
- klog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.")
- for i := range nodes {
- if nc.useTaintBasedEvictions {
- _, err := nc.markNodeAsReachable(nodes[i])
- if err != nil {
- klog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
- }
- } else {
- nc.cancelPodEviction(nodes[i])
- }
- }
- // We stop all evictions.
- for k := range nc.zoneStates {
- if nc.useTaintBasedEvictions {
- nc.zoneNoExecuteTainter[k].SwapLimiter(0)
- } else {
- nc.zonePodEvictor[k].SwapLimiter(0)
- }
- }
- for k := range nc.zoneStates {
- nc.zoneStates[k] = stateFullDisruption
- }
- // All rate limiters are updated, so we can return early here.
- return
- }
- // We're exiting full disruption mode
- if allWasFullyDisrupted {
- klog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.")
- // When exiting disruption mode update probe timestamps on all Nodes.
- now := nc.now()
- for i := range nodes {
- v := nc.nodeHealthMap.getDeepCopy(nodes[i].Name)
- v.probeTimestamp = now
- v.readyTransitionTimestamp = now
- nc.nodeHealthMap.set(nodes[i].Name, v)
- }
- // We reset all rate limiters to settings appropriate for the given state.
- for k := range nc.zoneStates {
- nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
- nc.zoneStates[k] = newZoneStates[k]
- }
- return
- }
- // We know that there's at least one not-fully disrupted so,
- // we can use default behavior for rate limiters
- for k, v := range nc.zoneStates {
- newState := newZoneStates[k]
- if v == newState {
- continue
- }
- klog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState)
- nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
- nc.zoneStates[k] = newState
- }
- }
- }
- func (nc *Controller) podUpdated(oldPod, newPod *v1.Pod) {
- if newPod == nil {
- return
- }
- if len(newPod.Spec.NodeName) != 0 && (oldPod == nil || newPod.Spec.NodeName != oldPod.Spec.NodeName) {
- podItem := podUpdateItem{newPod.Namespace, newPod.Name}
- nc.podUpdateQueue.Add(podItem)
- }
- }
- func (nc *Controller) doPodProcessingWorker() {
- for {
- obj, shutdown := nc.podUpdateQueue.Get()
- // "podUpdateQueue" will be shutdown when "stopCh" closed;
- // we do not need to re-check "stopCh" again.
- if shutdown {
- return
- }
- podItem := obj.(podUpdateItem)
- nc.processPod(podItem)
- }
- }
- // processPod is processing events of assigning pods to nodes. In particular:
- // 1. for NodeReady=true node, taint eviction for this pod will be cancelled
- // 2. for NodeReady=false or unknown node, taint eviction of pod will happen and pod will be marked as not ready
- // 3. if node doesn't exist in cache, it will be skipped and handled later by doEvictionPass
- func (nc *Controller) processPod(podItem podUpdateItem) {
- defer nc.podUpdateQueue.Done(podItem)
- pod, err := nc.podLister.Pods(podItem.namespace).Get(podItem.name)
- if err != nil {
- if apierrors.IsNotFound(err) {
- // If the pod was deleted, there is no need to requeue.
- return
- }
- klog.Warningf("Failed to read pod %v/%v: %v.", podItem.namespace, podItem.name, err)
- nc.podUpdateQueue.AddRateLimited(podItem)
- return
- }
- nodeName := pod.Spec.NodeName
- nodeHealth := nc.nodeHealthMap.getDeepCopy(nodeName)
- if nodeHealth == nil {
- // Node data is not gathered yet or node has beed removed in the meantime.
- // Pod will be handled by doEvictionPass method.
- return
- }
- node, err := nc.nodeLister.Get(nodeName)
- if err != nil {
- klog.Warningf("Failed to read node %v: %v.", nodeName, err)
- nc.podUpdateQueue.AddRateLimited(podItem)
- return
- }
- _, currentReadyCondition := nodeutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
- if currentReadyCondition == nil {
- // Lack of NodeReady condition may only happen after node addition (or if it will be maliciously deleted).
- // In both cases, the pod will be handled correctly (evicted if needed) during processing
- // of the next node update event.
- return
- }
- pods := []*v1.Pod{pod}
- // In taint-based eviction mode, only node updates are processed by NodeLifecycleController.
- // Pods are processed by TaintManager.
- if !nc.useTaintBasedEvictions {
- if err := nc.processNoTaintBaseEviction(node, currentReadyCondition, nc.nodeMonitorGracePeriod, pods); err != nil {
- klog.Warningf("Unable to process pod %+v eviction from node %v: %v.", podItem, nodeName, err)
- nc.podUpdateQueue.AddRateLimited(podItem)
- return
- }
- }
- if currentReadyCondition.Status != v1.ConditionTrue {
- if err := nodeutil.MarkPodsNotReady(nc.kubeClient, pods, nodeName); err != nil {
- klog.Warningf("Unable to mark pod %+v NotReady on node %v: %v.", podItem, nodeName, err)
- nc.podUpdateQueue.AddRateLimited(podItem)
- }
- }
- }
- func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
- switch state {
- case stateNormal:
- if nc.useTaintBasedEvictions {
- nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS)
- } else {
- nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
- }
- case statePartialDisruption:
- if nc.useTaintBasedEvictions {
- nc.zoneNoExecuteTainter[zone].SwapLimiter(
- nc.enterPartialDisruptionFunc(zoneSize))
- } else {
- nc.zonePodEvictor[zone].SwapLimiter(
- nc.enterPartialDisruptionFunc(zoneSize))
- }
- case stateFullDisruption:
- if nc.useTaintBasedEvictions {
- nc.zoneNoExecuteTainter[zone].SwapLimiter(
- nc.enterFullDisruptionFunc(zoneSize))
- } else {
- nc.zonePodEvictor[zone].SwapLimiter(
- nc.enterFullDisruptionFunc(zoneSize))
- }
- }
- }
- // classifyNodes classifies the allNodes to three categories:
- // 1. added: the nodes that in 'allNodes', but not in 'knownNodeSet'
- // 2. deleted: the nodes that in 'knownNodeSet', but not in 'allNodes'
- // 3. newZoneRepresentatives: the nodes that in both 'knownNodeSet' and 'allNodes', but no zone states
- func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZoneRepresentatives []*v1.Node) {
- for i := range allNodes {
- if _, has := nc.knownNodeSet[allNodes[i].Name]; !has {
- added = append(added, allNodes[i])
- } else {
- // Currently, we only consider new zone as updated.
- zone := utilnode.GetZoneKey(allNodes[i])
- if _, found := nc.zoneStates[zone]; !found {
- newZoneRepresentatives = append(newZoneRepresentatives, allNodes[i])
- }
- }
- }
- // If there's a difference between lengths of known Nodes and observed nodes
- // we must have removed some Node.
- if len(nc.knownNodeSet)+len(added) != len(allNodes) {
- knowSetCopy := map[string]*v1.Node{}
- for k, v := range nc.knownNodeSet {
- knowSetCopy[k] = v
- }
- for i := range allNodes {
- delete(knowSetCopy, allNodes[i].Name)
- }
- for i := range knowSetCopy {
- deleted = append(deleted, knowSetCopy[i])
- }
- }
- return
- }
- // HealthyQPSFunc returns the default value for cluster eviction rate - we take
- // nodeNum for consistency with ReducedQPSFunc.
- func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 {
- return nc.evictionLimiterQPS
- }
- // ReducedQPSFunc returns the QPS for when a the cluster is large make
- // evictions slower, if they're small stop evictions altogether.
- func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 {
- if int32(nodeNum) > nc.largeClusterThreshold {
- return nc.secondaryEvictionLimiterQPS
- }
- return 0
- }
- // addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor.
- func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) {
- nc.evictorLock.Lock()
- defer nc.evictorLock.Unlock()
- zone := utilnode.GetZoneKey(node)
- if _, found := nc.zoneStates[zone]; !found {
- nc.zoneStates[zone] = stateInitial
- if !nc.useTaintBasedEvictions {
- nc.zonePodEvictor[zone] =
- scheduler.NewRateLimitedTimedQueue(
- flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
- } else {
- nc.zoneNoExecuteTainter[zone] =
- scheduler.NewRateLimitedTimedQueue(
- flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
- }
- // Init the metric for the new zone.
- klog.Infof("Initializing eviction metric for zone: %v", zone)
- evictionsNumber.WithLabelValues(zone).Add(0)
- }
- }
- // cancelPodEviction removes any queued evictions, typically because the node is available again. It
- // returns true if an eviction was queued.
- func (nc *Controller) cancelPodEviction(node *v1.Node) bool {
- zone := utilnode.GetZoneKey(node)
- nc.evictorLock.Lock()
- defer nc.evictorLock.Unlock()
- if !nc.nodeEvictionMap.setStatus(node.Name, unmarked) {
- klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", node.Name)
- }
- wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name)
- if wasDeleting {
- klog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
- return true
- }
- return false
- }
- // evictPods:
- // - adds node to evictor queue if the node is not marked as evicted.
- // Returns false if the node name was already enqueued.
- // - deletes pods immediately if node is already marked as evicted.
- // Returns false, because the node wasn't added to the queue.
- func (nc *Controller) evictPods(node *v1.Node, pods []*v1.Pod) (bool, error) {
- nc.evictorLock.Lock()
- defer nc.evictorLock.Unlock()
- status, ok := nc.nodeEvictionMap.getStatus(node.Name)
- if ok && status == evicted {
- // Node eviction already happened for this node.
- // Handling immediate pod deletion.
- _, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, node.Name, string(node.UID), nc.daemonSetStore)
- if err != nil {
- return false, fmt.Errorf("unable to delete pods from node %q: %v", node.Name, err)
- }
- return false, nil
- }
- if !nc.nodeEvictionMap.setStatus(node.Name, toBeEvicted) {
- klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", node.Name)
- }
- return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)), nil
- }
- func (nc *Controller) markNodeForTainting(node *v1.Node) bool {
- nc.evictorLock.Lock()
- defer nc.evictorLock.Unlock()
- return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
- }
- func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) {
- nc.evictorLock.Lock()
- defer nc.evictorLock.Unlock()
- err := controller.RemoveTaintOffNode(nc.kubeClient, node.Name, node, UnreachableTaintTemplate)
- if err != nil {
- klog.Errorf("Failed to remove taint from node %v: %v", node.Name, err)
- return false, err
- }
- err = controller.RemoveTaintOffNode(nc.kubeClient, node.Name, node, NotReadyTaintTemplate)
- if err != nil {
- klog.Errorf("Failed to remove taint from node %v: %v", node.Name, err)
- return false, err
- }
- return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Remove(node.Name), nil
- }
- // ComputeZoneState returns a slice of NodeReadyConditions for all Nodes in a given zone.
- // The zone is considered:
- // - fullyDisrupted if there're no Ready Nodes,
- // - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready,
- // - normal otherwise
- func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {
- readyNodes := 0
- notReadyNodes := 0
- for i := range nodeReadyConditions {
- if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == v1.ConditionTrue {
- readyNodes++
- } else {
- notReadyNodes++
- }
- }
- switch {
- case readyNodes == 0 && notReadyNodes > 0:
- return notReadyNodes, stateFullDisruption
- case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
- return notReadyNodes, statePartialDisruption
- default:
- return notReadyNodes, stateNormal
- }
- }
- // reconcileNodeLabels reconciles node labels.
- func (nc *Controller) reconcileNodeLabels(nodeName string) error {
- node, err := nc.nodeLister.Get(nodeName)
- if err != nil {
- // If node not found, just ignore it.
- if apierrors.IsNotFound(err) {
- return nil
- }
- return err
- }
- if node.Labels == nil {
- // Nothing to reconcile.
- return nil
- }
- labelsToUpdate := map[string]string{}
- for _, r := range labelReconcileInfo {
- primaryValue, primaryExists := node.Labels[r.primaryKey]
- secondaryValue, secondaryExists := node.Labels[r.secondaryKey]
- if !primaryExists {
- // The primary label key does not exist. This should not happen
- // within our supported version skew range, when no external
- // components/factors modifying the node object. Ignore this case.
- continue
- }
- if secondaryExists && primaryValue != secondaryValue {
- // Secondary label exists, but not consistent with the primary
- // label. Need to reconcile.
- labelsToUpdate[r.secondaryKey] = primaryValue
- } else if !secondaryExists && r.ensureSecondaryExists {
- // Apply secondary label based on primary label.
- labelsToUpdate[r.secondaryKey] = primaryValue
- }
- }
- if len(labelsToUpdate) == 0 {
- return nil
- }
- if !nodeutil.AddOrUpdateLabelsOnNode(nc.kubeClient, labelsToUpdate, node) {
- return fmt.Errorf("failed update labels for node %+v", node)
- }
- return nil
- }
|