node_lifecycle_controller.go 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // The Controller sets tainted annotations on nodes.
  14. // Tainted nodes should not be used for new work loads and
  15. // some effort should be given to getting existing work
  16. // loads off of tainted nodes.
  17. package nodelifecycle
  18. import (
  19. "fmt"
  20. "sync"
  21. "time"
  22. "k8s.io/klog"
  23. coordv1beta1 "k8s.io/api/coordination/v1beta1"
  24. "k8s.io/api/core/v1"
  25. apiequality "k8s.io/apimachinery/pkg/api/equality"
  26. apierrors "k8s.io/apimachinery/pkg/api/errors"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/labels"
  29. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. utilfeature "k8s.io/apiserver/pkg/util/feature"
  32. appsv1informers "k8s.io/client-go/informers/apps/v1"
  33. coordinformers "k8s.io/client-go/informers/coordination/v1beta1"
  34. coreinformers "k8s.io/client-go/informers/core/v1"
  35. clientset "k8s.io/client-go/kubernetes"
  36. "k8s.io/client-go/kubernetes/scheme"
  37. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  38. appsv1listers "k8s.io/client-go/listers/apps/v1"
  39. coordlisters "k8s.io/client-go/listers/coordination/v1beta1"
  40. corelisters "k8s.io/client-go/listers/core/v1"
  41. "k8s.io/client-go/tools/cache"
  42. "k8s.io/client-go/tools/record"
  43. "k8s.io/client-go/util/flowcontrol"
  44. "k8s.io/client-go/util/workqueue"
  45. "k8s.io/kubernetes/pkg/controller"
  46. "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
  47. nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
  48. "k8s.io/kubernetes/pkg/features"
  49. kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
  50. schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
  51. "k8s.io/kubernetes/pkg/util/metrics"
  52. utilnode "k8s.io/kubernetes/pkg/util/node"
  53. "k8s.io/kubernetes/pkg/util/system"
  54. taintutils "k8s.io/kubernetes/pkg/util/taints"
  55. )
  56. func init() {
  57. // Register prometheus metrics
  58. Register()
  59. }
  60. var (
  61. // UnreachableTaintTemplate is the taint for when a node becomes unreachable.
  62. UnreachableTaintTemplate = &v1.Taint{
  63. Key: schedulerapi.TaintNodeUnreachable,
  64. Effect: v1.TaintEffectNoExecute,
  65. }
  66. // NotReadyTaintTemplate is the taint for when a node is not ready for
  67. // executing pods
  68. NotReadyTaintTemplate = &v1.Taint{
  69. Key: schedulerapi.TaintNodeNotReady,
  70. Effect: v1.TaintEffectNoExecute,
  71. }
  72. // map {NodeConditionType: {ConditionStatus: TaintKey}}
  73. // represents which NodeConditionType under which ConditionStatus should be
  74. // tainted with which TaintKey
  75. // for certain NodeConditionType, there are multiple {ConditionStatus,TaintKey} pairs
  76. nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
  77. v1.NodeReady: {
  78. v1.ConditionFalse: schedulerapi.TaintNodeNotReady,
  79. v1.ConditionUnknown: schedulerapi.TaintNodeUnreachable,
  80. },
  81. v1.NodeMemoryPressure: {
  82. v1.ConditionTrue: schedulerapi.TaintNodeMemoryPressure,
  83. },
  84. v1.NodeDiskPressure: {
  85. v1.ConditionTrue: schedulerapi.TaintNodeDiskPressure,
  86. },
  87. v1.NodeNetworkUnavailable: {
  88. v1.ConditionTrue: schedulerapi.TaintNodeNetworkUnavailable,
  89. },
  90. v1.NodePIDPressure: {
  91. v1.ConditionTrue: schedulerapi.TaintNodePIDPressure,
  92. },
  93. }
  94. taintKeyToNodeConditionMap = map[string]v1.NodeConditionType{
  95. schedulerapi.TaintNodeNotReady: v1.NodeReady,
  96. schedulerapi.TaintNodeUnreachable: v1.NodeReady,
  97. schedulerapi.TaintNodeNetworkUnavailable: v1.NodeNetworkUnavailable,
  98. schedulerapi.TaintNodeMemoryPressure: v1.NodeMemoryPressure,
  99. schedulerapi.TaintNodeDiskPressure: v1.NodeDiskPressure,
  100. schedulerapi.TaintNodePIDPressure: v1.NodePIDPressure,
  101. }
  102. )
  103. // ZoneState is the state of a given zone.
  104. type ZoneState string
  105. const (
  106. stateInitial = ZoneState("Initial")
  107. stateNormal = ZoneState("Normal")
  108. stateFullDisruption = ZoneState("FullDisruption")
  109. statePartialDisruption = ZoneState("PartialDisruption")
  110. )
  111. const (
  112. // The amount of time the nodecontroller should sleep between retrying node health updates
  113. retrySleepTime = 20 * time.Millisecond
  114. )
  115. // labelReconcileInfo lists Node labels to reconcile, and how to reconcile them.
  116. // primaryKey and secondaryKey are keys of labels to reconcile.
  117. // - If both keys exist, but their values don't match. Use the value from the
  118. // primaryKey as the source of truth to reconcile.
  119. // - If ensureSecondaryExists is true, and the secondaryKey does not
  120. // exist, secondaryKey will be added with the value of the primaryKey.
  121. var labelReconcileInfo = []struct {
  122. primaryKey string
  123. secondaryKey string
  124. ensureSecondaryExists bool
  125. }{
  126. {
  127. // Reconcile the beta and the stable OS label using the beta label as
  128. // the source of truth.
  129. // TODO(#73084): switch to using the stable label as the source of
  130. // truth in v1.18.
  131. primaryKey: kubeletapis.LabelOS,
  132. secondaryKey: v1.LabelOSStable,
  133. ensureSecondaryExists: true,
  134. },
  135. {
  136. // Reconcile the beta and the stable arch label using the beta label as
  137. // the source of truth.
  138. // TODO(#73084): switch to using the stable label as the source of
  139. // truth in v1.18.
  140. primaryKey: kubeletapis.LabelArch,
  141. secondaryKey: v1.LabelArchStable,
  142. ensureSecondaryExists: true,
  143. },
  144. }
  145. type nodeHealthData struct {
  146. probeTimestamp metav1.Time
  147. readyTransitionTimestamp metav1.Time
  148. status *v1.NodeStatus
  149. lease *coordv1beta1.Lease
  150. }
  151. // Controller is the controller that manages node's life cycle.
  152. type Controller struct {
  153. taintManager *scheduler.NoExecuteTaintManager
  154. podInformerSynced cache.InformerSynced
  155. kubeClient clientset.Interface
  156. // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
  157. // to avoid the problem with time skew across the cluster.
  158. now func() metav1.Time
  159. enterPartialDisruptionFunc func(nodeNum int) float32
  160. enterFullDisruptionFunc func(nodeNum int) float32
  161. computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
  162. knownNodeSet map[string]*v1.Node
  163. // per Node map storing last observed health together with a local time when it was observed.
  164. nodeHealthMap map[string]*nodeHealthData
  165. // Lock to access evictor workers
  166. evictorLock sync.Mutex
  167. // workers that evicts pods from unresponsive nodes.
  168. zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
  169. // workers that are responsible for tainting nodes.
  170. zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
  171. zoneStates map[string]ZoneState
  172. daemonSetStore appsv1listers.DaemonSetLister
  173. daemonSetInformerSynced cache.InformerSynced
  174. leaseLister coordlisters.LeaseLister
  175. leaseInformerSynced cache.InformerSynced
  176. nodeLister corelisters.NodeLister
  177. nodeInformerSynced cache.InformerSynced
  178. recorder record.EventRecorder
  179. // Value controlling Controller monitoring period, i.e. how often does Controller
  180. // check node health signal posted from kubelet. This value should be lower than
  181. // nodeMonitorGracePeriod.
  182. // TODO: Change node health monitor to watch based.
  183. nodeMonitorPeriod time.Duration
  184. // When node is just created, e.g. cluster bootstrap or node creation, we give
  185. // a longer grace period.
  186. nodeStartupGracePeriod time.Duration
  187. // Controller will not proactively sync node health, but will monitor node
  188. // health signal updated from kubelet. There are 2 kinds of node healthiness
  189. // signals: NodeStatus and NodeLease. NodeLease signal is generated only when
  190. // NodeLease feature is enabled. If it doesn't receive update for this amount
  191. // of time, it will start posting "NodeReady==ConditionUnknown". The amount of
  192. // time before which Controller start evicting pods is controlled via flag
  193. // 'pod-eviction-timeout'.
  194. // Note: be cautious when changing the constant, it must work with
  195. // nodeStatusUpdateFrequency in kubelet and renewInterval in NodeLease
  196. // controller. The node health signal update frequency is the minimal of the
  197. // two.
  198. // There are several constraints:
  199. // 1. nodeMonitorGracePeriod must be N times more than the node health signal
  200. // update frequency, where N means number of retries allowed for kubelet to
  201. // post node status/lease. It is pointless to make nodeMonitorGracePeriod
  202. // be less than the node health signal update frequency, since there will
  203. // only be fresh values from Kubelet at an interval of node health signal
  204. // update frequency. The constant must be less than podEvictionTimeout.
  205. // 2. nodeMonitorGracePeriod can't be too large for user experience - larger
  206. // value takes longer for user to see up-to-date node health.
  207. nodeMonitorGracePeriod time.Duration
  208. podEvictionTimeout time.Duration
  209. evictionLimiterQPS float32
  210. secondaryEvictionLimiterQPS float32
  211. largeClusterThreshold int32
  212. unhealthyZoneThreshold float32
  213. // if set to true Controller will start TaintManager that will evict Pods from
  214. // tainted nodes, if they're not tolerated.
  215. runTaintManager bool
  216. // if set to true Controller will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
  217. // taints instead of evicting Pods itself.
  218. useTaintBasedEvictions bool
  219. // if set to true, NodeController will taint Nodes based on its condition for 'NetworkUnavailable',
  220. // 'MemoryPressure', 'PIDPressure' and 'DiskPressure'.
  221. taintNodeByCondition bool
  222. nodeUpdateQueue workqueue.Interface
  223. }
  224. // NewNodeLifecycleController returns a new taint controller.
  225. func NewNodeLifecycleController(
  226. leaseInformer coordinformers.LeaseInformer,
  227. podInformer coreinformers.PodInformer,
  228. nodeInformer coreinformers.NodeInformer,
  229. daemonSetInformer appsv1informers.DaemonSetInformer,
  230. kubeClient clientset.Interface,
  231. nodeMonitorPeriod time.Duration,
  232. nodeStartupGracePeriod time.Duration,
  233. nodeMonitorGracePeriod time.Duration,
  234. podEvictionTimeout time.Duration,
  235. evictionLimiterQPS float32,
  236. secondaryEvictionLimiterQPS float32,
  237. largeClusterThreshold int32,
  238. unhealthyZoneThreshold float32,
  239. runTaintManager bool,
  240. useTaintBasedEvictions bool,
  241. taintNodeByCondition bool) (*Controller, error) {
  242. if kubeClient == nil {
  243. klog.Fatalf("kubeClient is nil when starting Controller")
  244. }
  245. eventBroadcaster := record.NewBroadcaster()
  246. recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
  247. eventBroadcaster.StartLogging(klog.Infof)
  248. klog.Infof("Sending events to api server.")
  249. eventBroadcaster.StartRecordingToSink(
  250. &v1core.EventSinkImpl{
  251. Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""),
  252. })
  253. if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  254. metrics.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
  255. }
  256. nc := &Controller{
  257. kubeClient: kubeClient,
  258. now: metav1.Now,
  259. knownNodeSet: make(map[string]*v1.Node),
  260. nodeHealthMap: make(map[string]*nodeHealthData),
  261. recorder: recorder,
  262. nodeMonitorPeriod: nodeMonitorPeriod,
  263. nodeStartupGracePeriod: nodeStartupGracePeriod,
  264. nodeMonitorGracePeriod: nodeMonitorGracePeriod,
  265. zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
  266. zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue),
  267. zoneStates: make(map[string]ZoneState),
  268. podEvictionTimeout: podEvictionTimeout,
  269. evictionLimiterQPS: evictionLimiterQPS,
  270. secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
  271. largeClusterThreshold: largeClusterThreshold,
  272. unhealthyZoneThreshold: unhealthyZoneThreshold,
  273. runTaintManager: runTaintManager,
  274. useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
  275. taintNodeByCondition: taintNodeByCondition,
  276. nodeUpdateQueue: workqueue.NewNamed("node_lifecycle_controller"),
  277. }
  278. if useTaintBasedEvictions {
  279. klog.Infof("Controller is using taint based evictions.")
  280. }
  281. nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
  282. nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
  283. nc.computeZoneStateFunc = nc.ComputeZoneState
  284. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  285. AddFunc: func(obj interface{}) {
  286. pod := obj.(*v1.Pod)
  287. if nc.taintManager != nil {
  288. nc.taintManager.PodUpdated(nil, pod)
  289. }
  290. },
  291. UpdateFunc: func(prev, obj interface{}) {
  292. prevPod := prev.(*v1.Pod)
  293. newPod := obj.(*v1.Pod)
  294. if nc.taintManager != nil {
  295. nc.taintManager.PodUpdated(prevPod, newPod)
  296. }
  297. },
  298. DeleteFunc: func(obj interface{}) {
  299. pod, isPod := obj.(*v1.Pod)
  300. // We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly.
  301. if !isPod {
  302. deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
  303. if !ok {
  304. klog.Errorf("Received unexpected object: %v", obj)
  305. return
  306. }
  307. pod, ok = deletedState.Obj.(*v1.Pod)
  308. if !ok {
  309. klog.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj)
  310. return
  311. }
  312. }
  313. if nc.taintManager != nil {
  314. nc.taintManager.PodUpdated(pod, nil)
  315. }
  316. },
  317. })
  318. nc.podInformerSynced = podInformer.Informer().HasSynced
  319. if nc.runTaintManager {
  320. podLister := podInformer.Lister()
  321. podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) }
  322. nodeLister := nodeInformer.Lister()
  323. nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
  324. nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter)
  325. nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  326. AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
  327. nc.taintManager.NodeUpdated(nil, node)
  328. return nil
  329. }),
  330. UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
  331. nc.taintManager.NodeUpdated(oldNode, newNode)
  332. return nil
  333. }),
  334. DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
  335. nc.taintManager.NodeUpdated(node, nil)
  336. return nil
  337. }),
  338. })
  339. }
  340. klog.Infof("Controller will reconcile labels.")
  341. nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  342. AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
  343. nc.nodeUpdateQueue.Add(node.Name)
  344. return nil
  345. }),
  346. UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
  347. nc.nodeUpdateQueue.Add(newNode.Name)
  348. return nil
  349. }),
  350. })
  351. if nc.taintNodeByCondition {
  352. klog.Infof("Controller will taint node by condition.")
  353. }
  354. nc.leaseLister = leaseInformer.Lister()
  355. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
  356. nc.leaseInformerSynced = leaseInformer.Informer().HasSynced
  357. } else {
  358. // Always indicate that lease is synced to prevent syncing lease.
  359. nc.leaseInformerSynced = func() bool { return true }
  360. }
  361. nc.nodeLister = nodeInformer.Lister()
  362. nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
  363. nc.daemonSetStore = daemonSetInformer.Lister()
  364. nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
  365. return nc, nil
  366. }
  367. // Run starts an asynchronous loop that monitors the status of cluster nodes.
  368. func (nc *Controller) Run(stopCh <-chan struct{}) {
  369. defer utilruntime.HandleCrash()
  370. klog.Infof("Starting node controller")
  371. defer klog.Infof("Shutting down node controller")
  372. if !controller.WaitForCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
  373. return
  374. }
  375. if nc.runTaintManager {
  376. go nc.taintManager.Run(stopCh)
  377. }
  378. // Close node update queue to cleanup go routine.
  379. defer nc.nodeUpdateQueue.ShutDown()
  380. // Start workers to reconcile labels and/or update NoSchedule taint for nodes.
  381. for i := 0; i < scheduler.UpdateWorkerSize; i++ {
  382. // Thanks to "workqueue", each worker just need to get item from queue, because
  383. // the item is flagged when got from queue: if new event come, the new item will
  384. // be re-queued until "Done", so no more than one worker handle the same item and
  385. // no event missed.
  386. go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)
  387. }
  388. if nc.useTaintBasedEvictions {
  389. // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
  390. // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
  391. go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
  392. } else {
  393. // Managing eviction of nodes:
  394. // When we delete pods off a node, if the node was not empty at the time we then
  395. // queue an eviction watcher. If we hit an error, retry deletion.
  396. go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
  397. }
  398. // Incorporate the results of node health signal pushed from kubelet to master.
  399. go wait.Until(func() {
  400. if err := nc.monitorNodeHealth(); err != nil {
  401. klog.Errorf("Error monitoring node health: %v", err)
  402. }
  403. }, nc.nodeMonitorPeriod, stopCh)
  404. <-stopCh
  405. }
  406. func (nc *Controller) doNodeProcessingPassWorker() {
  407. for {
  408. obj, shutdown := nc.nodeUpdateQueue.Get()
  409. // "nodeUpdateQueue" will be shutdown when "stopCh" closed;
  410. // we do not need to re-check "stopCh" again.
  411. if shutdown {
  412. return
  413. }
  414. nodeName := obj.(string)
  415. if nc.taintNodeByCondition {
  416. if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
  417. klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err)
  418. // TODO(k82cn): Add nodeName back to the queue
  419. }
  420. }
  421. // TODO: re-evaluate whether there are any labels that need to be
  422. // reconcile in 1.19. Remove this function if it's no longer necessary.
  423. if err := nc.reconcileNodeLabels(nodeName); err != nil {
  424. klog.Errorf("Failed to reconcile labels for node <%s>, requeue it: %v", nodeName, err)
  425. // TODO(yujuhong): Add nodeName back to the queue
  426. }
  427. nc.nodeUpdateQueue.Done(nodeName)
  428. }
  429. }
  430. func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
  431. node, err := nc.nodeLister.Get(nodeName)
  432. if err != nil {
  433. // If node not found, just ignore it.
  434. if apierrors.IsNotFound(err) {
  435. return nil
  436. }
  437. return err
  438. }
  439. // Map node's condition to Taints.
  440. var taints []v1.Taint
  441. for _, condition := range node.Status.Conditions {
  442. if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
  443. if taintKey, found := taintMap[condition.Status]; found {
  444. taints = append(taints, v1.Taint{
  445. Key: taintKey,
  446. Effect: v1.TaintEffectNoSchedule,
  447. })
  448. }
  449. }
  450. }
  451. if node.Spec.Unschedulable {
  452. // If unschedulable, append related taint.
  453. taints = append(taints, v1.Taint{
  454. Key: schedulerapi.TaintNodeUnschedulable,
  455. Effect: v1.TaintEffectNoSchedule,
  456. })
  457. }
  458. // Get exist taints of node.
  459. nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
  460. // only NoSchedule taints are candidates to be compared with "taints" later
  461. if t.Effect != v1.TaintEffectNoSchedule {
  462. return false
  463. }
  464. // Find unschedulable taint of node.
  465. if t.Key == schedulerapi.TaintNodeUnschedulable {
  466. return true
  467. }
  468. // Find node condition taints of node.
  469. _, found := taintKeyToNodeConditionMap[t.Key]
  470. return found
  471. })
  472. taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
  473. // If nothing to add not delete, return true directly.
  474. if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
  475. return nil
  476. }
  477. if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
  478. return fmt.Errorf("failed to swap taints of node %+v", node)
  479. }
  480. return nil
  481. }
  482. func (nc *Controller) doNoExecuteTaintingPass() {
  483. nc.evictorLock.Lock()
  484. defer nc.evictorLock.Unlock()
  485. for k := range nc.zoneNoExecuteTainter {
  486. // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
  487. nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
  488. node, err := nc.nodeLister.Get(value.Value)
  489. if apierrors.IsNotFound(err) {
  490. klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
  491. return true, 0
  492. } else if err != nil {
  493. klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
  494. // retry in 50 millisecond
  495. return false, 50 * time.Millisecond
  496. }
  497. _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
  498. // Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
  499. taintToAdd := v1.Taint{}
  500. oppositeTaint := v1.Taint{}
  501. if condition.Status == v1.ConditionFalse {
  502. taintToAdd = *NotReadyTaintTemplate
  503. oppositeTaint = *UnreachableTaintTemplate
  504. } else if condition.Status == v1.ConditionUnknown {
  505. taintToAdd = *UnreachableTaintTemplate
  506. oppositeTaint = *NotReadyTaintTemplate
  507. } else {
  508. // It seems that the Node is ready again, so there's no need to taint it.
  509. klog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value)
  510. return true, 0
  511. }
  512. result := nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
  513. if result {
  514. //count the evictionsNumber
  515. zone := utilnode.GetZoneKey(node)
  516. evictionsNumber.WithLabelValues(zone).Inc()
  517. }
  518. return result, 0
  519. })
  520. }
  521. }
  522. func (nc *Controller) doEvictionPass() {
  523. nc.evictorLock.Lock()
  524. defer nc.evictorLock.Unlock()
  525. for k := range nc.zonePodEvictor {
  526. // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
  527. nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
  528. node, err := nc.nodeLister.Get(value.Value)
  529. if apierrors.IsNotFound(err) {
  530. klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
  531. } else if err != nil {
  532. klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
  533. }
  534. nodeUID, _ := value.UID.(string)
  535. remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
  536. if err != nil {
  537. utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
  538. return false, 0
  539. }
  540. if remaining {
  541. klog.Infof("Pods awaiting deletion due to Controller eviction")
  542. }
  543. //count the evictionsNumber
  544. if node != nil {
  545. zone := utilnode.GetZoneKey(node)
  546. evictionsNumber.WithLabelValues(zone).Inc()
  547. }
  548. return true, 0
  549. })
  550. }
  551. }
  552. // monitorNodeHealth verifies node health are constantly updated by kubelet, and
  553. // if not, post "NodeReady==ConditionUnknown".
  554. // For nodes who are not ready or not reachable for a long period of time.
  555. // This function will taint them if TaintBasedEvictions feature was enabled.
  556. // Otherwise, it would evict it directly.
  557. func (nc *Controller) monitorNodeHealth() error {
  558. // We are listing nodes from local cache as we can tolerate some small delays
  559. // comparing to state from etcd and there is eventual consistency anyway.
  560. nodes, err := nc.nodeLister.List(labels.Everything())
  561. if err != nil {
  562. return err
  563. }
  564. added, deleted, newZoneRepresentatives := nc.classifyNodes(nodes)
  565. for i := range newZoneRepresentatives {
  566. nc.addPodEvictorForNewZone(newZoneRepresentatives[i])
  567. }
  568. for i := range added {
  569. klog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name)
  570. nodeutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
  571. nc.knownNodeSet[added[i].Name] = added[i]
  572. nc.addPodEvictorForNewZone(added[i])
  573. if nc.useTaintBasedEvictions {
  574. nc.markNodeAsReachable(added[i])
  575. } else {
  576. nc.cancelPodEviction(added[i])
  577. }
  578. }
  579. for i := range deleted {
  580. klog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name)
  581. nodeutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
  582. delete(nc.knownNodeSet, deleted[i].Name)
  583. }
  584. zoneToNodeConditions := map[string][]*v1.NodeCondition{}
  585. for i := range nodes {
  586. var gracePeriod time.Duration
  587. var observedReadyCondition v1.NodeCondition
  588. var currentReadyCondition *v1.NodeCondition
  589. node := nodes[i].DeepCopy()
  590. if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
  591. gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(node)
  592. if err == nil {
  593. return true, nil
  594. }
  595. name := node.Name
  596. node, err = nc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
  597. if err != nil {
  598. klog.Errorf("Failed while getting a Node to retry updating node health. Probably Node %s was deleted.", name)
  599. return false, err
  600. }
  601. return false, nil
  602. }); err != nil {
  603. klog.Errorf("Update health of Node '%v' from Controller error: %v. "+
  604. "Skipping - no pods will be evicted.", node.Name, err)
  605. continue
  606. }
  607. // We do not treat a master node as a part of the cluster for network disruption checking.
  608. if !system.IsMasterNode(node.Name) {
  609. zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
  610. }
  611. decisionTimestamp := nc.now()
  612. if currentReadyCondition != nil {
  613. // Check eviction timeout against decisionTimestamp
  614. if observedReadyCondition.Status == v1.ConditionFalse {
  615. if nc.useTaintBasedEvictions {
  616. // We want to update the taint straight away if Node is already tainted with the UnreachableTaint
  617. if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
  618. taintToAdd := *NotReadyTaintTemplate
  619. if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
  620. klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
  621. }
  622. } else if nc.markNodeForTainting(node) {
  623. klog.V(2).Infof("Node %v is NotReady as of %v. Adding it to the Taint queue.",
  624. node.Name,
  625. decisionTimestamp,
  626. )
  627. }
  628. } else {
  629. if decisionTimestamp.After(nc.nodeHealthMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
  630. if nc.evictPods(node) {
  631. klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v",
  632. node.Name,
  633. decisionTimestamp,
  634. nc.nodeHealthMap[node.Name].readyTransitionTimestamp,
  635. nc.podEvictionTimeout,
  636. )
  637. }
  638. }
  639. }
  640. }
  641. if observedReadyCondition.Status == v1.ConditionUnknown {
  642. if nc.useTaintBasedEvictions {
  643. // We want to update the taint straight away if Node is already tainted with the UnreachableTaint
  644. if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
  645. taintToAdd := *UnreachableTaintTemplate
  646. if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
  647. klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
  648. }
  649. } else if nc.markNodeForTainting(node) {
  650. klog.V(2).Infof("Node %v is unresponsive as of %v. Adding it to the Taint queue.",
  651. node.Name,
  652. decisionTimestamp,
  653. )
  654. }
  655. } else {
  656. if decisionTimestamp.After(nc.nodeHealthMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
  657. if nc.evictPods(node) {
  658. klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v",
  659. node.Name,
  660. decisionTimestamp,
  661. nc.nodeHealthMap[node.Name].readyTransitionTimestamp,
  662. nc.podEvictionTimeout-gracePeriod,
  663. )
  664. }
  665. }
  666. }
  667. }
  668. if observedReadyCondition.Status == v1.ConditionTrue {
  669. if nc.useTaintBasedEvictions {
  670. removed, err := nc.markNodeAsReachable(node)
  671. if err != nil {
  672. klog.Errorf("Failed to remove taints from node %v. Will retry in next iteration.", node.Name)
  673. }
  674. if removed {
  675. klog.V(2).Infof("Node %s is healthy again, removing all taints", node.Name)
  676. }
  677. } else {
  678. if nc.cancelPodEviction(node) {
  679. klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
  680. }
  681. }
  682. }
  683. // Report node event.
  684. if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
  685. nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
  686. if err = nodeutil.MarkAllPodsNotReady(nc.kubeClient, node); err != nil {
  687. utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
  688. }
  689. }
  690. }
  691. }
  692. nc.handleDisruption(zoneToNodeConditions, nodes)
  693. return nil
  694. }
  695. // tryUpdateNodeHealth checks a given node's conditions and tries to update it. Returns grace period to
  696. // which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
  697. func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
  698. var err error
  699. var gracePeriod time.Duration
  700. var observedReadyCondition v1.NodeCondition
  701. _, currentReadyCondition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
  702. if currentReadyCondition == nil {
  703. // If ready condition is nil, then kubelet (or nodecontroller) never posted node status.
  704. // A fake ready condition is created, where LastHeartbeatTime and LastTransitionTime is set
  705. // to node.CreationTimestamp to avoid handle the corner case.
  706. observedReadyCondition = v1.NodeCondition{
  707. Type: v1.NodeReady,
  708. Status: v1.ConditionUnknown,
  709. LastHeartbeatTime: node.CreationTimestamp,
  710. LastTransitionTime: node.CreationTimestamp,
  711. }
  712. gracePeriod = nc.nodeStartupGracePeriod
  713. if _, found := nc.nodeHealthMap[node.Name]; found {
  714. nc.nodeHealthMap[node.Name].status = &node.Status
  715. } else {
  716. nc.nodeHealthMap[node.Name] = &nodeHealthData{
  717. status: &node.Status,
  718. probeTimestamp: node.CreationTimestamp,
  719. readyTransitionTimestamp: node.CreationTimestamp,
  720. }
  721. }
  722. } else {
  723. // If ready condition is not nil, make a copy of it, since we may modify it in place later.
  724. observedReadyCondition = *currentReadyCondition
  725. gracePeriod = nc.nodeMonitorGracePeriod
  726. }
  727. savedNodeHealth, found := nc.nodeHealthMap[node.Name]
  728. // There are following cases to check:
  729. // - both saved and new status have no Ready Condition set - we leave everything as it is,
  730. // - saved status have no Ready Condition, but current one does - Controller was restarted with Node data already present in etcd,
  731. // - saved status have some Ready Condition, but current one does not - it's an error, but we fill it up because that's probably a good thing to do,
  732. // - both saved and current statuses have Ready Conditions and they have the same LastProbeTime - nothing happened on that Node, it may be
  733. // unresponsive, so we leave it as it is,
  734. // - both saved and current statuses have Ready Conditions, they have different LastProbeTimes, but the same Ready Condition State -
  735. // everything's in order, no transition occurred, we update only probeTimestamp,
  736. // - both saved and current statuses have Ready Conditions, different LastProbeTimes and different Ready Condition State -
  737. // Ready Condition changed it state since we last seen it, so we update both probeTimestamp and readyTransitionTimestamp.
  738. // TODO: things to consider:
  739. // - if 'LastProbeTime' have gone back in time its probably an error, currently we ignore it,
  740. // - currently only correct Ready State transition outside of Node Controller is marking it ready by Kubelet, we don't check
  741. // if that's the case, but it does not seem necessary.
  742. var savedCondition *v1.NodeCondition
  743. var savedLease *coordv1beta1.Lease
  744. if found {
  745. _, savedCondition = nodeutil.GetNodeCondition(savedNodeHealth.status, v1.NodeReady)
  746. savedLease = savedNodeHealth.lease
  747. }
  748. _, observedCondition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
  749. if !found {
  750. klog.Warningf("Missing timestamp for Node %s. Assuming now as a timestamp.", node.Name)
  751. savedNodeHealth = &nodeHealthData{
  752. status: &node.Status,
  753. probeTimestamp: nc.now(),
  754. readyTransitionTimestamp: nc.now(),
  755. }
  756. } else if savedCondition == nil && observedCondition != nil {
  757. klog.V(1).Infof("Creating timestamp entry for newly observed Node %s", node.Name)
  758. savedNodeHealth = &nodeHealthData{
  759. status: &node.Status,
  760. probeTimestamp: nc.now(),
  761. readyTransitionTimestamp: nc.now(),
  762. }
  763. } else if savedCondition != nil && observedCondition == nil {
  764. klog.Errorf("ReadyCondition was removed from Status of Node %s", node.Name)
  765. // TODO: figure out what to do in this case. For now we do the same thing as above.
  766. savedNodeHealth = &nodeHealthData{
  767. status: &node.Status,
  768. probeTimestamp: nc.now(),
  769. readyTransitionTimestamp: nc.now(),
  770. }
  771. } else if savedCondition != nil && observedCondition != nil && savedCondition.LastHeartbeatTime != observedCondition.LastHeartbeatTime {
  772. var transitionTime metav1.Time
  773. // If ReadyCondition changed since the last time we checked, we update the transition timestamp to "now",
  774. // otherwise we leave it as it is.
  775. if savedCondition.LastTransitionTime != observedCondition.LastTransitionTime {
  776. klog.V(3).Infof("ReadyCondition for Node %s transitioned from %v to %v", node.Name, savedCondition, observedCondition)
  777. transitionTime = nc.now()
  778. } else {
  779. transitionTime = savedNodeHealth.readyTransitionTimestamp
  780. }
  781. if klog.V(5) {
  782. klog.Infof("Node %s ReadyCondition updated. Updating timestamp: %+v vs %+v.", node.Name, savedNodeHealth.status, node.Status)
  783. } else {
  784. klog.V(3).Infof("Node %s ReadyCondition updated. Updating timestamp.", node.Name)
  785. }
  786. savedNodeHealth = &nodeHealthData{
  787. status: &node.Status,
  788. probeTimestamp: nc.now(),
  789. readyTransitionTimestamp: transitionTime,
  790. }
  791. }
  792. var observedLease *coordv1beta1.Lease
  793. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
  794. // Always update the probe time if node lease is renewed.
  795. // Note: If kubelet never posted the node status, but continues renewing the
  796. // heartbeat leases, the node controller will assume the node is healthy and
  797. // take no action.
  798. observedLease, _ = nc.leaseLister.Leases(v1.NamespaceNodeLease).Get(node.Name)
  799. if observedLease != nil && (savedLease == nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {
  800. savedNodeHealth.lease = observedLease
  801. savedNodeHealth.probeTimestamp = nc.now()
  802. }
  803. }
  804. nc.nodeHealthMap[node.Name] = savedNodeHealth
  805. if nc.now().After(savedNodeHealth.probeTimestamp.Add(gracePeriod)) {
  806. // NodeReady condition or lease was last set longer ago than gracePeriod, so
  807. // update it to Unknown (regardless of its current value) in the master.
  808. if currentReadyCondition == nil {
  809. klog.V(2).Infof("node %v is never updated by kubelet", node.Name)
  810. node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
  811. Type: v1.NodeReady,
  812. Status: v1.ConditionUnknown,
  813. Reason: "NodeStatusNeverUpdated",
  814. Message: fmt.Sprintf("Kubelet never posted node status."),
  815. LastHeartbeatTime: node.CreationTimestamp,
  816. LastTransitionTime: nc.now(),
  817. })
  818. } else {
  819. klog.V(4).Infof("node %v hasn't been updated for %+v. Last ready condition is: %+v",
  820. node.Name, nc.now().Time.Sub(savedNodeHealth.probeTimestamp.Time), observedReadyCondition)
  821. if observedReadyCondition.Status != v1.ConditionUnknown {
  822. currentReadyCondition.Status = v1.ConditionUnknown
  823. currentReadyCondition.Reason = "NodeStatusUnknown"
  824. currentReadyCondition.Message = "Kubelet stopped posting node status."
  825. // LastProbeTime is the last time we heard from kubelet.
  826. currentReadyCondition.LastHeartbeatTime = observedReadyCondition.LastHeartbeatTime
  827. currentReadyCondition.LastTransitionTime = nc.now()
  828. }
  829. }
  830. // remaining node conditions should also be set to Unknown
  831. remainingNodeConditionTypes := []v1.NodeConditionType{
  832. v1.NodeMemoryPressure,
  833. v1.NodeDiskPressure,
  834. v1.NodePIDPressure,
  835. // We don't change 'NodeNetworkUnavailable' condition, as it's managed on a control plane level.
  836. // v1.NodeNetworkUnavailable,
  837. }
  838. nowTimestamp := nc.now()
  839. for _, nodeConditionType := range remainingNodeConditionTypes {
  840. _, currentCondition := nodeutil.GetNodeCondition(&node.Status, nodeConditionType)
  841. if currentCondition == nil {
  842. klog.V(2).Infof("Condition %v of node %v was never updated by kubelet", nodeConditionType, node.Name)
  843. node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
  844. Type: nodeConditionType,
  845. Status: v1.ConditionUnknown,
  846. Reason: "NodeStatusNeverUpdated",
  847. Message: "Kubelet never posted node status.",
  848. LastHeartbeatTime: node.CreationTimestamp,
  849. LastTransitionTime: nowTimestamp,
  850. })
  851. } else {
  852. klog.V(4).Infof("node %v hasn't been updated for %+v. Last %v is: %+v",
  853. node.Name, nc.now().Time.Sub(savedNodeHealth.probeTimestamp.Time), nodeConditionType, currentCondition)
  854. if currentCondition.Status != v1.ConditionUnknown {
  855. currentCondition.Status = v1.ConditionUnknown
  856. currentCondition.Reason = "NodeStatusUnknown"
  857. currentCondition.Message = "Kubelet stopped posting node status."
  858. currentCondition.LastTransitionTime = nowTimestamp
  859. }
  860. }
  861. }
  862. _, currentCondition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
  863. if !apiequality.Semantic.DeepEqual(currentCondition, &observedReadyCondition) {
  864. if _, err = nc.kubeClient.CoreV1().Nodes().UpdateStatus(node); err != nil {
  865. klog.Errorf("Error updating node %s: %v", node.Name, err)
  866. return gracePeriod, observedReadyCondition, currentReadyCondition, err
  867. }
  868. nc.nodeHealthMap[node.Name] = &nodeHealthData{
  869. status: &node.Status,
  870. probeTimestamp: nc.nodeHealthMap[node.Name].probeTimestamp,
  871. readyTransitionTimestamp: nc.now(),
  872. lease: observedLease,
  873. }
  874. return gracePeriod, observedReadyCondition, currentReadyCondition, nil
  875. }
  876. }
  877. return gracePeriod, observedReadyCondition, currentReadyCondition, err
  878. }
  879. func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
  880. newZoneStates := map[string]ZoneState{}
  881. allAreFullyDisrupted := true
  882. for k, v := range zoneToNodeConditions {
  883. zoneSize.WithLabelValues(k).Set(float64(len(v)))
  884. unhealthy, newState := nc.computeZoneStateFunc(v)
  885. zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
  886. unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
  887. if newState != stateFullDisruption {
  888. allAreFullyDisrupted = false
  889. }
  890. newZoneStates[k] = newState
  891. if _, had := nc.zoneStates[k]; !had {
  892. klog.Errorf("Setting initial state for unseen zone: %v", k)
  893. nc.zoneStates[k] = stateInitial
  894. }
  895. }
  896. allWasFullyDisrupted := true
  897. for k, v := range nc.zoneStates {
  898. if _, have := zoneToNodeConditions[k]; !have {
  899. zoneSize.WithLabelValues(k).Set(0)
  900. zoneHealth.WithLabelValues(k).Set(100)
  901. unhealthyNodes.WithLabelValues(k).Set(0)
  902. delete(nc.zoneStates, k)
  903. continue
  904. }
  905. if v != stateFullDisruption {
  906. allWasFullyDisrupted = false
  907. break
  908. }
  909. }
  910. // At least one node was responding in previous pass or in the current pass. Semantics is as follows:
  911. // - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
  912. // - if the new state is "normal" we resume normal operation (go back to default limiter settings),
  913. // - if new state is "fullDisruption" we restore normal eviction rate,
  914. // - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
  915. if !allAreFullyDisrupted || !allWasFullyDisrupted {
  916. // We're switching to full disruption mode
  917. if allAreFullyDisrupted {
  918. klog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.")
  919. for i := range nodes {
  920. if nc.useTaintBasedEvictions {
  921. _, err := nc.markNodeAsReachable(nodes[i])
  922. if err != nil {
  923. klog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
  924. }
  925. } else {
  926. nc.cancelPodEviction(nodes[i])
  927. }
  928. }
  929. // We stop all evictions.
  930. for k := range nc.zoneStates {
  931. if nc.useTaintBasedEvictions {
  932. nc.zoneNoExecuteTainter[k].SwapLimiter(0)
  933. } else {
  934. nc.zonePodEvictor[k].SwapLimiter(0)
  935. }
  936. }
  937. for k := range nc.zoneStates {
  938. nc.zoneStates[k] = stateFullDisruption
  939. }
  940. // All rate limiters are updated, so we can return early here.
  941. return
  942. }
  943. // We're exiting full disruption mode
  944. if allWasFullyDisrupted {
  945. klog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.")
  946. // When exiting disruption mode update probe timestamps on all Nodes.
  947. now := nc.now()
  948. for i := range nodes {
  949. v := nc.nodeHealthMap[nodes[i].Name]
  950. v.probeTimestamp = now
  951. v.readyTransitionTimestamp = now
  952. nc.nodeHealthMap[nodes[i].Name] = v
  953. }
  954. // We reset all rate limiters to settings appropriate for the given state.
  955. for k := range nc.zoneStates {
  956. nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
  957. nc.zoneStates[k] = newZoneStates[k]
  958. }
  959. return
  960. }
  961. // We know that there's at least one not-fully disrupted so,
  962. // we can use default behavior for rate limiters
  963. for k, v := range nc.zoneStates {
  964. newState := newZoneStates[k]
  965. if v == newState {
  966. continue
  967. }
  968. klog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState)
  969. nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
  970. nc.zoneStates[k] = newState
  971. }
  972. }
  973. }
  974. func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
  975. switch state {
  976. case stateNormal:
  977. if nc.useTaintBasedEvictions {
  978. nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS)
  979. } else {
  980. nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
  981. }
  982. case statePartialDisruption:
  983. if nc.useTaintBasedEvictions {
  984. nc.zoneNoExecuteTainter[zone].SwapLimiter(
  985. nc.enterPartialDisruptionFunc(zoneSize))
  986. } else {
  987. nc.zonePodEvictor[zone].SwapLimiter(
  988. nc.enterPartialDisruptionFunc(zoneSize))
  989. }
  990. case stateFullDisruption:
  991. if nc.useTaintBasedEvictions {
  992. nc.zoneNoExecuteTainter[zone].SwapLimiter(
  993. nc.enterFullDisruptionFunc(zoneSize))
  994. } else {
  995. nc.zonePodEvictor[zone].SwapLimiter(
  996. nc.enterFullDisruptionFunc(zoneSize))
  997. }
  998. }
  999. }
  1000. // classifyNodes classifies the allNodes to three categories:
  1001. // 1. added: the nodes that in 'allNodes', but not in 'knownNodeSet'
  1002. // 2. deleted: the nodes that in 'knownNodeSet', but not in 'allNodes'
  1003. // 3. newZoneRepresentatives: the nodes that in both 'knownNodeSet' and 'allNodes', but no zone states
  1004. func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZoneRepresentatives []*v1.Node) {
  1005. for i := range allNodes {
  1006. if _, has := nc.knownNodeSet[allNodes[i].Name]; !has {
  1007. added = append(added, allNodes[i])
  1008. } else {
  1009. // Currently, we only consider new zone as updated.
  1010. zone := utilnode.GetZoneKey(allNodes[i])
  1011. if _, found := nc.zoneStates[zone]; !found {
  1012. newZoneRepresentatives = append(newZoneRepresentatives, allNodes[i])
  1013. }
  1014. }
  1015. }
  1016. // If there's a difference between lengths of known Nodes and observed nodes
  1017. // we must have removed some Node.
  1018. if len(nc.knownNodeSet)+len(added) != len(allNodes) {
  1019. knowSetCopy := map[string]*v1.Node{}
  1020. for k, v := range nc.knownNodeSet {
  1021. knowSetCopy[k] = v
  1022. }
  1023. for i := range allNodes {
  1024. delete(knowSetCopy, allNodes[i].Name)
  1025. }
  1026. for i := range knowSetCopy {
  1027. deleted = append(deleted, knowSetCopy[i])
  1028. }
  1029. }
  1030. return
  1031. }
  1032. // HealthyQPSFunc returns the default value for cluster eviction rate - we take
  1033. // nodeNum for consistency with ReducedQPSFunc.
  1034. func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 {
  1035. return nc.evictionLimiterQPS
  1036. }
  1037. // ReducedQPSFunc returns the QPS for when a the cluster is large make
  1038. // evictions slower, if they're small stop evictions altogether.
  1039. func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 {
  1040. if int32(nodeNum) > nc.largeClusterThreshold {
  1041. return nc.secondaryEvictionLimiterQPS
  1042. }
  1043. return 0
  1044. }
  1045. // addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor.
  1046. func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) {
  1047. nc.evictorLock.Lock()
  1048. defer nc.evictorLock.Unlock()
  1049. zone := utilnode.GetZoneKey(node)
  1050. if _, found := nc.zoneStates[zone]; !found {
  1051. nc.zoneStates[zone] = stateInitial
  1052. if !nc.useTaintBasedEvictions {
  1053. nc.zonePodEvictor[zone] =
  1054. scheduler.NewRateLimitedTimedQueue(
  1055. flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
  1056. } else {
  1057. nc.zoneNoExecuteTainter[zone] =
  1058. scheduler.NewRateLimitedTimedQueue(
  1059. flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
  1060. }
  1061. // Init the metric for the new zone.
  1062. klog.Infof("Initializing eviction metric for zone: %v", zone)
  1063. evictionsNumber.WithLabelValues(zone).Add(0)
  1064. }
  1065. }
  1066. // cancelPodEviction removes any queued evictions, typically because the node is available again. It
  1067. // returns true if an eviction was queued.
  1068. func (nc *Controller) cancelPodEviction(node *v1.Node) bool {
  1069. zone := utilnode.GetZoneKey(node)
  1070. nc.evictorLock.Lock()
  1071. defer nc.evictorLock.Unlock()
  1072. wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name)
  1073. if wasDeleting {
  1074. klog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
  1075. return true
  1076. }
  1077. return false
  1078. }
  1079. // evictPods queues an eviction for the provided node name, and returns false if the node is already
  1080. // queued for eviction.
  1081. func (nc *Controller) evictPods(node *v1.Node) bool {
  1082. nc.evictorLock.Lock()
  1083. defer nc.evictorLock.Unlock()
  1084. return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
  1085. }
  1086. func (nc *Controller) markNodeForTainting(node *v1.Node) bool {
  1087. nc.evictorLock.Lock()
  1088. defer nc.evictorLock.Unlock()
  1089. return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
  1090. }
  1091. func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) {
  1092. nc.evictorLock.Lock()
  1093. defer nc.evictorLock.Unlock()
  1094. err := controller.RemoveTaintOffNode(nc.kubeClient, node.Name, node, UnreachableTaintTemplate)
  1095. if err != nil {
  1096. klog.Errorf("Failed to remove taint from node %v: %v", node.Name, err)
  1097. return false, err
  1098. }
  1099. err = controller.RemoveTaintOffNode(nc.kubeClient, node.Name, node, NotReadyTaintTemplate)
  1100. if err != nil {
  1101. klog.Errorf("Failed to remove taint from node %v: %v", node.Name, err)
  1102. return false, err
  1103. }
  1104. return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Remove(node.Name), nil
  1105. }
  1106. // ComputeZoneState returns a slice of NodeReadyConditions for all Nodes in a given zone.
  1107. // The zone is considered:
  1108. // - fullyDisrupted if there're no Ready Nodes,
  1109. // - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready,
  1110. // - normal otherwise
  1111. func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {
  1112. readyNodes := 0
  1113. notReadyNodes := 0
  1114. for i := range nodeReadyConditions {
  1115. if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == v1.ConditionTrue {
  1116. readyNodes++
  1117. } else {
  1118. notReadyNodes++
  1119. }
  1120. }
  1121. switch {
  1122. case readyNodes == 0 && notReadyNodes > 0:
  1123. return notReadyNodes, stateFullDisruption
  1124. case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
  1125. return notReadyNodes, statePartialDisruption
  1126. default:
  1127. return notReadyNodes, stateNormal
  1128. }
  1129. }
  1130. // reconcileNodeLabels reconciles node labels.
  1131. func (nc *Controller) reconcileNodeLabels(nodeName string) error {
  1132. node, err := nc.nodeLister.Get(nodeName)
  1133. if err != nil {
  1134. // If node not found, just ignore it.
  1135. if apierrors.IsNotFound(err) {
  1136. return nil
  1137. }
  1138. return err
  1139. }
  1140. if node.Labels == nil {
  1141. // Nothing to reconcile.
  1142. return nil
  1143. }
  1144. labelsToUpdate := map[string]string{}
  1145. for _, r := range labelReconcileInfo {
  1146. primaryValue, primaryExists := node.Labels[r.primaryKey]
  1147. secondaryValue, secondaryExists := node.Labels[r.secondaryKey]
  1148. if !primaryExists {
  1149. // The primary label key does not exist. This should not happen
  1150. // within our supported version skew range, when no external
  1151. // components/factors modifying the node object. Ignore this case.
  1152. continue
  1153. }
  1154. if secondaryExists && primaryValue != secondaryValue {
  1155. // Secondary label exists, but not consistent with the primary
  1156. // label. Need to reconcile.
  1157. labelsToUpdate[r.secondaryKey] = primaryValue
  1158. } else if !secondaryExists && r.ensureSecondaryExists {
  1159. // Apply secondary label based on primary label.
  1160. labelsToUpdate[r.secondaryKey] = primaryValue
  1161. }
  1162. }
  1163. if len(labelsToUpdate) == 0 {
  1164. return nil
  1165. }
  1166. if !nodeutil.AddOrUpdateLabelsOnNode(nc.kubeClient, labelsToUpdate, node) {
  1167. return fmt.Errorf("failed update labels for node %+v", node)
  1168. }
  1169. return nil
  1170. }