1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package daemon
- import (
- "context"
- "fmt"
- "reflect"
- "sort"
- "sync"
- "time"
- "k8s.io/klog"
- apps "k8s.io/api/apps/v1"
- v1 "k8s.io/api/core/v1"
- apiequality "k8s.io/apimachinery/pkg/api/equality"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- appsinformers "k8s.io/client-go/informers/apps/v1"
- coreinformers "k8s.io/client-go/informers/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- unversionedapps "k8s.io/client-go/kubernetes/typed/apps/v1"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- appslisters "k8s.io/client-go/listers/apps/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/client-go/util/flowcontrol"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/component-base/metrics/prometheus/ratelimiter"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/daemon/util"
- pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- "k8s.io/utils/integer"
- )
- const (
- // BurstReplicas is a rate limiter for booting pods on a lot of pods.
- // The value of 250 is chosen b/c values that are too high can cause registry DoS issues.
- BurstReplicas = 250
- // StatusUpdateRetries limits the number of retries if sending a status update to API server fails.
- StatusUpdateRetries = 1
- // BackoffGCInterval is the time that has to pass before next iteration of backoff GC is run
- BackoffGCInterval = 1 * time.Minute
- )
- // Reasons for DaemonSet events
- const (
- // SelectingAllReason is added to an event when a DaemonSet selects all Pods.
- SelectingAllReason = "SelectingAll"
- // FailedPlacementReason is added to an event when a DaemonSet can't schedule a Pod to a specified node.
- FailedPlacementReason = "FailedPlacement"
- // FailedDaemonPodReason is added to an event when the status of a Pod of a DaemonSet is 'Failed'.
- FailedDaemonPodReason = "FailedDaemonPod"
- )
- // controllerKind contains the schema.GroupVersionKind for this controller type.
- var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet")
- // DaemonSetsController is responsible for synchronizing DaemonSet objects stored
- // in the system with actual running pods.
- type DaemonSetsController struct {
- kubeClient clientset.Interface
- eventRecorder record.EventRecorder
- podControl controller.PodControlInterface
- crControl controller.ControllerRevisionControlInterface
- // An dsc is temporarily suspended after creating/deleting these many replicas.
- // It resumes normal action after observing the watch events for them.
- burstReplicas int
- // To allow injection of syncDaemonSet for testing.
- syncHandler func(dsKey string) error
- // used for unit testing
- enqueueDaemonSet func(ds *apps.DaemonSet)
- // A TTLCache of pod creates/deletes each ds expects to see
- expectations controller.ControllerExpectationsInterface
- // dsLister can list/get daemonsets from the shared informer's store
- dsLister appslisters.DaemonSetLister
- // dsStoreSynced returns true if the daemonset store has been synced at least once.
- // Added as a member to the struct to allow injection for testing.
- dsStoreSynced cache.InformerSynced
- // historyLister get list/get history from the shared informers's store
- historyLister appslisters.ControllerRevisionLister
- // historyStoreSynced returns true if the history store has been synced at least once.
- // Added as a member to the struct to allow injection for testing.
- historyStoreSynced cache.InformerSynced
- // podLister get list/get pods from the shared informers's store
- podLister corelisters.PodLister
- // podNodeIndex indexes pods by their nodeName
- podNodeIndex cache.Indexer
- // podStoreSynced returns true if the pod store has been synced at least once.
- // Added as a member to the struct to allow injection for testing.
- podStoreSynced cache.InformerSynced
- // nodeLister can list/get nodes from the shared informer's store
- nodeLister corelisters.NodeLister
- // nodeStoreSynced returns true if the node store has been synced at least once.
- // Added as a member to the struct to allow injection for testing.
- nodeStoreSynced cache.InformerSynced
- // DaemonSet keys that need to be synced.
- queue workqueue.RateLimitingInterface
- failedPodsBackoff *flowcontrol.Backoff
- }
- // NewDaemonSetsController creates a new DaemonSetsController
- func NewDaemonSetsController(
- daemonSetInformer appsinformers.DaemonSetInformer,
- historyInformer appsinformers.ControllerRevisionInformer,
- podInformer coreinformers.PodInformer,
- nodeInformer coreinformers.NodeInformer,
- kubeClient clientset.Interface,
- failedPodsBackoff *flowcontrol.Backoff,
- ) (*DaemonSetsController, error) {
- eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartLogging(klog.Infof)
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
- if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
- if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
- return nil, err
- }
- }
- dsc := &DaemonSetsController{
- kubeClient: kubeClient,
- eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
- podControl: controller.RealPodControl{
- KubeClient: kubeClient,
- Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
- },
- crControl: controller.RealControllerRevisionControl{
- KubeClient: kubeClient,
- },
- burstReplicas: BurstReplicas,
- expectations: controller.NewControllerExpectations(),
- queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
- }
- daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- ds := obj.(*apps.DaemonSet)
- klog.V(4).Infof("Adding daemon set %s", ds.Name)
- dsc.enqueueDaemonSet(ds)
- },
- UpdateFunc: func(old, cur interface{}) {
- oldDS := old.(*apps.DaemonSet)
- curDS := cur.(*apps.DaemonSet)
- klog.V(4).Infof("Updating daemon set %s", oldDS.Name)
- dsc.enqueueDaemonSet(curDS)
- },
- DeleteFunc: dsc.deleteDaemonset,
- })
- dsc.dsLister = daemonSetInformer.Lister()
- dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
- historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: dsc.addHistory,
- UpdateFunc: dsc.updateHistory,
- DeleteFunc: dsc.deleteHistory,
- })
- dsc.historyLister = historyInformer.Lister()
- dsc.historyStoreSynced = historyInformer.Informer().HasSynced
- // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
- // more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
- podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: dsc.addPod,
- UpdateFunc: dsc.updatePod,
- DeleteFunc: dsc.deletePod,
- })
- dsc.podLister = podInformer.Lister()
- // This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
- podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
- "nodeName": indexByPodNodeName,
- })
- dsc.podNodeIndex = podInformer.Informer().GetIndexer()
- dsc.podStoreSynced = podInformer.Informer().HasSynced
- nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: dsc.addNode,
- UpdateFunc: dsc.updateNode,
- },
- )
- dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
- dsc.nodeLister = nodeInformer.Lister()
- dsc.syncHandler = dsc.syncDaemonSet
- dsc.enqueueDaemonSet = dsc.enqueue
- dsc.failedPodsBackoff = failedPodsBackoff
- return dsc, nil
- }
- func indexByPodNodeName(obj interface{}) ([]string, error) {
- pod, ok := obj.(*v1.Pod)
- if !ok {
- return []string{}, nil
- }
- // We are only interested in active pods with nodeName set
- if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
- return []string{}, nil
- }
- return []string{pod.Spec.NodeName}, nil
- }
- func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
- ds, ok := obj.(*apps.DaemonSet)
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
- return
- }
- ds, ok = tombstone.Obj.(*apps.DaemonSet)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj))
- return
- }
- }
- klog.V(4).Infof("Deleting daemon set %s", ds.Name)
- dsc.enqueueDaemonSet(ds)
- }
- // Run begins watching and syncing daemon sets.
- func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- defer dsc.queue.ShutDown()
- klog.Infof("Starting daemon sets controller")
- defer klog.Infof("Shutting down daemon sets controller")
- if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
- return
- }
- for i := 0; i < workers; i++ {
- go wait.Until(dsc.runWorker, time.Second, stopCh)
- }
- go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)
- <-stopCh
- }
- func (dsc *DaemonSetsController) runWorker() {
- for dsc.processNextWorkItem() {
- }
- }
- // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
- func (dsc *DaemonSetsController) processNextWorkItem() bool {
- dsKey, quit := dsc.queue.Get()
- if quit {
- return false
- }
- defer dsc.queue.Done(dsKey)
- err := dsc.syncHandler(dsKey.(string))
- if err == nil {
- dsc.queue.Forget(dsKey)
- return true
- }
- utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
- dsc.queue.AddRateLimited(dsKey)
- return true
- }
- func (dsc *DaemonSetsController) enqueue(ds *apps.DaemonSet) {
- key, err := controller.KeyFunc(ds)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
- return
- }
- // TODO: Handle overlapping controllers better. See comment in ReplicationManager.
- dsc.queue.Add(key)
- }
- func (dsc *DaemonSetsController) enqueueRateLimited(ds *apps.DaemonSet) {
- key, err := controller.KeyFunc(ds)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
- return
- }
- dsc.queue.AddRateLimited(key)
- }
- func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) {
- key, err := controller.KeyFunc(obj)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
- return
- }
- // TODO: Handle overlapping controllers better. See comment in ReplicationManager.
- dsc.queue.AddAfter(key, after)
- }
- // getDaemonSetsForPod returns a list of DaemonSets that potentially match the pod.
- func (dsc *DaemonSetsController) getDaemonSetsForPod(pod *v1.Pod) []*apps.DaemonSet {
- sets, err := dsc.dsLister.GetPodDaemonSets(pod)
- if err != nil {
- return nil
- }
- if len(sets) > 1 {
- // ControllerRef will ensure we don't do anything crazy, but more than one
- // item in this list nevertheless constitutes user error.
- utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
- }
- return sets
- }
- // getDaemonSetsForHistory returns a list of DaemonSets that potentially
- // match a ControllerRevision.
- func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.ControllerRevision) []*apps.DaemonSet {
- daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history)
- if err != nil || len(daemonSets) == 0 {
- return nil
- }
- if len(daemonSets) > 1 {
- // ControllerRef will ensure we don't do anything crazy, but more than one
- // item in this list nevertheless constitutes user error.
- klog.V(4).Infof("User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v",
- history.Namespace, history.Name, history.Labels)
- }
- return daemonSets
- }
- // addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created
- // or when the controller manager is restarted.
- func (dsc *DaemonSetsController) addHistory(obj interface{}) {
- history := obj.(*apps.ControllerRevision)
- if history.DeletionTimestamp != nil {
- // On a restart of the controller manager, it's possible for an object to
- // show up in a state that is already pending deletion.
- dsc.deleteHistory(history)
- return
- }
- // If it has a ControllerRef, that's all that matters.
- if controllerRef := metav1.GetControllerOf(history); controllerRef != nil {
- ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
- if ds == nil {
- return
- }
- klog.V(4).Infof("ControllerRevision %s added.", history.Name)
- return
- }
- // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
- // them to see if anyone wants to adopt it.
- daemonSets := dsc.getDaemonSetsForHistory(history)
- if len(daemonSets) == 0 {
- return
- }
- klog.V(4).Infof("Orphan ControllerRevision %s added.", history.Name)
- for _, ds := range daemonSets {
- dsc.enqueueDaemonSet(ds)
- }
- }
- // updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision
- // is updated and wake them up. If anything of the ControllerRevision has changed, we need to awaken
- // both the old and new DaemonSets.
- func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
- curHistory := cur.(*apps.ControllerRevision)
- oldHistory := old.(*apps.ControllerRevision)
- if curHistory.ResourceVersion == oldHistory.ResourceVersion {
- // Periodic resync will send update events for all known ControllerRevisions.
- return
- }
- curControllerRef := metav1.GetControllerOf(curHistory)
- oldControllerRef := metav1.GetControllerOf(oldHistory)
- controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
- if controllerRefChanged && oldControllerRef != nil {
- // The ControllerRef was changed. Sync the old controller, if any.
- if ds := dsc.resolveControllerRef(oldHistory.Namespace, oldControllerRef); ds != nil {
- dsc.enqueueDaemonSet(ds)
- }
- }
- // If it has a ControllerRef, that's all that matters.
- if curControllerRef != nil {
- ds := dsc.resolveControllerRef(curHistory.Namespace, curControllerRef)
- if ds == nil {
- return
- }
- klog.V(4).Infof("ControllerRevision %s updated.", curHistory.Name)
- dsc.enqueueDaemonSet(ds)
- return
- }
- // Otherwise, it's an orphan. If anything changed, sync matching controllers
- // to see if anyone wants to adopt it now.
- labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels)
- if labelChanged || controllerRefChanged {
- daemonSets := dsc.getDaemonSetsForHistory(curHistory)
- if len(daemonSets) == 0 {
- return
- }
- klog.V(4).Infof("Orphan ControllerRevision %s updated.", curHistory.Name)
- for _, ds := range daemonSets {
- dsc.enqueueDaemonSet(ds)
- }
- }
- }
- // deleteHistory enqueues the DaemonSet that manages a ControllerRevision when
- // the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or
- // a DeletionFinalStateUnknown marker item.
- func (dsc *DaemonSetsController) deleteHistory(obj interface{}) {
- history, ok := obj.(*apps.ControllerRevision)
- // When a delete is dropped, the relist will notice a ControllerRevision in the store not
- // in the list, leading to the insertion of a tombstone object which contains
- // the deleted key/value. Note that this value might be stale. If the ControllerRevision
- // changed labels the new DaemonSet will not be woken up till the periodic resync.
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
- return
- }
- history, ok = tombstone.Obj.(*apps.ControllerRevision)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ControllerRevision %#v", obj))
- return
- }
- }
- controllerRef := metav1.GetControllerOf(history)
- if controllerRef == nil {
- // No controller should care about orphans being deleted.
- return
- }
- ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
- if ds == nil {
- return
- }
- klog.V(4).Infof("ControllerRevision %s deleted.", history.Name)
- dsc.enqueueDaemonSet(ds)
- }
- func (dsc *DaemonSetsController) addPod(obj interface{}) {
- pod := obj.(*v1.Pod)
- if pod.DeletionTimestamp != nil {
- // on a restart of the controller manager, it's possible a new pod shows up in a state that
- // is already pending deletion. Prevent the pod from being a creation observation.
- dsc.deletePod(pod)
- return
- }
- // If it has a ControllerRef, that's all that matters.
- if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
- ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
- if ds == nil {
- return
- }
- dsKey, err := controller.KeyFunc(ds)
- if err != nil {
- return
- }
- klog.V(4).Infof("Pod %s added.", pod.Name)
- dsc.expectations.CreationObserved(dsKey)
- dsc.enqueueDaemonSet(ds)
- return
- }
- // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
- // them to see if anyone wants to adopt it.
- // DO NOT observe creation because no controller should be waiting for an
- // orphan.
- dss := dsc.getDaemonSetsForPod(pod)
- if len(dss) == 0 {
- return
- }
- klog.V(4).Infof("Orphan Pod %s added.", pod.Name)
- for _, ds := range dss {
- dsc.enqueueDaemonSet(ds)
- }
- }
- // When a pod is updated, figure out what sets manage it and wake them
- // up. If the labels of the pod have changed we need to awaken both the old
- // and new set. old and cur must be *v1.Pod types.
- func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
- curPod := cur.(*v1.Pod)
- oldPod := old.(*v1.Pod)
- if curPod.ResourceVersion == oldPod.ResourceVersion {
- // Periodic resync will send update events for all known pods.
- // Two different versions of the same pod will always have different RVs.
- return
- }
- if curPod.DeletionTimestamp != nil {
- // when a pod is deleted gracefully its deletion timestamp is first modified to reflect a grace period,
- // and after such time has passed, the kubelet actually deletes it from the store. We receive an update
- // for modification of the deletion timestamp and expect an ds to create more replicas asap, not wait
- // until the kubelet actually deletes the pod.
- dsc.deletePod(curPod)
- return
- }
- curControllerRef := metav1.GetControllerOf(curPod)
- oldControllerRef := metav1.GetControllerOf(oldPod)
- controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
- if controllerRefChanged && oldControllerRef != nil {
- // The ControllerRef was changed. Sync the old controller, if any.
- if ds := dsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); ds != nil {
- dsc.enqueueDaemonSet(ds)
- }
- }
- // If it has a ControllerRef, that's all that matters.
- if curControllerRef != nil {
- ds := dsc.resolveControllerRef(curPod.Namespace, curControllerRef)
- if ds == nil {
- return
- }
- klog.V(4).Infof("Pod %s updated.", curPod.Name)
- dsc.enqueueDaemonSet(ds)
- changedToReady := !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod)
- // See https://github.com/kubernetes/kubernetes/pull/38076 for more details
- if changedToReady && ds.Spec.MinReadySeconds > 0 {
- // Add a second to avoid milliseconds skew in AddAfter.
- // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
- dsc.enqueueDaemonSetAfter(ds, (time.Duration(ds.Spec.MinReadySeconds)*time.Second)+time.Second)
- }
- return
- }
- // Otherwise, it's an orphan. If anything changed, sync matching controllers
- // to see if anyone wants to adopt it now.
- dss := dsc.getDaemonSetsForPod(curPod)
- if len(dss) == 0 {
- return
- }
- klog.V(4).Infof("Orphan Pod %s updated.", curPod.Name)
- labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
- if labelChanged || controllerRefChanged {
- for _, ds := range dss {
- dsc.enqueueDaemonSet(ds)
- }
- }
- }
- func (dsc *DaemonSetsController) deletePod(obj interface{}) {
- pod, ok := obj.(*v1.Pod)
- // When a delete is dropped, the relist will notice a pod in the store not
- // in the list, leading to the insertion of a tombstone object which contains
- // the deleted key/value. Note that this value might be stale. If the pod
- // changed labels the new daemonset will not be woken up till the periodic
- // resync.
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
- return
- }
- pod, ok = tombstone.Obj.(*v1.Pod)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
- return
- }
- }
- controllerRef := metav1.GetControllerOf(pod)
- if controllerRef == nil {
- // No controller should care about orphans being deleted.
- return
- }
- ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
- if ds == nil {
- return
- }
- dsKey, err := controller.KeyFunc(ds)
- if err != nil {
- return
- }
- klog.V(4).Infof("Pod %s deleted.", pod.Name)
- dsc.expectations.DeletionObserved(dsKey)
- dsc.enqueueDaemonSet(ds)
- }
- func (dsc *DaemonSetsController) addNode(obj interface{}) {
- // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
- dsList, err := dsc.dsLister.List(labels.Everything())
- if err != nil {
- klog.V(4).Infof("Error enqueueing daemon sets: %v", err)
- return
- }
- node := obj.(*v1.Node)
- for _, ds := range dsList {
- shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
- if err != nil {
- continue
- }
- if shouldRun {
- dsc.enqueueDaemonSet(ds)
- }
- }
- }
- // nodeInSameCondition returns true if all effective types ("Status" is true) equals;
- // otherwise, returns false.
- func nodeInSameCondition(old []v1.NodeCondition, cur []v1.NodeCondition) bool {
- if len(old) == 0 && len(cur) == 0 {
- return true
- }
- c1map := map[v1.NodeConditionType]v1.ConditionStatus{}
- for _, c := range old {
- if c.Status == v1.ConditionTrue {
- c1map[c.Type] = c.Status
- }
- }
- for _, c := range cur {
- if c.Status != v1.ConditionTrue {
- continue
- }
- if _, found := c1map[c.Type]; !found {
- return false
- }
- delete(c1map, c.Type)
- }
- return len(c1map) == 0
- }
- func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
- if !nodeInSameCondition(oldNode.Status.Conditions, curNode.Status.Conditions) {
- return false
- }
- oldNode.ResourceVersion = curNode.ResourceVersion
- oldNode.Status.Conditions = curNode.Status.Conditions
- return apiequality.Semantic.DeepEqual(oldNode, curNode)
- }
- func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
- oldNode := old.(*v1.Node)
- curNode := cur.(*v1.Node)
- if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
- return
- }
- dsList, err := dsc.dsLister.List(labels.Everything())
- if err != nil {
- klog.V(4).Infof("Error listing daemon sets: %v", err)
- return
- }
- // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
- for _, ds := range dsList {
- oldShouldRun, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
- if err != nil {
- continue
- }
- currentShouldRun, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
- if err != nil {
- continue
- }
- if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) {
- dsc.enqueueDaemonSet(ds)
- }
- }
- }
- // getDaemonPods returns daemon pods owned by the given ds.
- // This also reconciles ControllerRef by adopting/orphaning.
- // Note that returned Pods are pointers to objects in the cache.
- // If you want to modify one, you need to deep-copy it first.
- func (dsc *DaemonSetsController) getDaemonPods(ds *apps.DaemonSet) ([]*v1.Pod, error) {
- selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
- if err != nil {
- return nil, err
- }
- // List all pods to include those that don't match the selector anymore but
- // have a ControllerRef pointing to this controller.
- pods, err := dsc.podLister.Pods(ds.Namespace).List(labels.Everything())
- if err != nil {
- return nil, err
- }
- // If any adoptions are attempted, we should first recheck for deletion with
- // an uncached quorum read sometime after listing Pods (see #42639).
- dsNotDeleted := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
- fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{})
- if err != nil {
- return nil, err
- }
- if fresh.UID != ds.UID {
- return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
- }
- return fresh, nil
- })
- // Use ControllerRefManager to adopt/orphan as needed.
- cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, dsNotDeleted)
- return cm.ClaimPods(pods)
- }
- // getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) created for the nodes.
- // This also reconciles ControllerRef by adopting/orphaning.
- // Note that returned Pods are pointers to objects in the cache.
- // If you want to modify one, you need to deep-copy it first.
- func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[string][]*v1.Pod, error) {
- claimedPods, err := dsc.getDaemonPods(ds)
- if err != nil {
- return nil, err
- }
- // Group Pods by Node name.
- nodeToDaemonPods := make(map[string][]*v1.Pod)
- for _, pod := range claimedPods {
- nodeName, err := util.GetTargetNodeName(pod)
- if err != nil {
- klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
- pod.Namespace, pod.Name, ds.Namespace, ds.Name)
- continue
- }
- nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
- }
- return nodeToDaemonPods, nil
- }
- // resolveControllerRef returns the controller referenced by a ControllerRef,
- // or nil if the ControllerRef could not be resolved to a matching controller
- // of the correct Kind.
- func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.DaemonSet {
- // We can't look up by UID, so look up by Name and then verify UID.
- // Don't even try to look up by Name if it's the wrong Kind.
- if controllerRef.Kind != controllerKind.Kind {
- return nil
- }
- ds, err := dsc.dsLister.DaemonSets(namespace).Get(controllerRef.Name)
- if err != nil {
- return nil
- }
- if ds.UID != controllerRef.UID {
- // The controller we found with this Name is not the same one that the
- // ControllerRef points to.
- return nil
- }
- return ds
- }
- // podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node:
- // - nodesNeedingDaemonPods: the pods need to start on the node
- // - podsToDelete: the Pods need to be deleted on the node
- // - err: unexpected error
- func (dsc *DaemonSetsController) podsShouldBeOnNode(
- node *v1.Node,
- nodeToDaemonPods map[string][]*v1.Pod,
- ds *apps.DaemonSet,
- ) (nodesNeedingDaemonPods, podsToDelete []string, err error) {
- shouldRun, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
- if err != nil {
- return
- }
- daemonPods, exists := nodeToDaemonPods[node.Name]
- switch {
- case shouldRun && !exists:
- // If daemon pod is supposed to be running on node, but isn't, create daemon pod.
- nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
- case shouldContinueRunning:
- // If a daemon pod failed, delete it
- // If there's non-daemon pods left on this node, we will create it in the next sync loop
- var daemonPodsRunning []*v1.Pod
- for _, pod := range daemonPods {
- if pod.DeletionTimestamp != nil {
- continue
- }
- if pod.Status.Phase == v1.PodFailed {
- // This is a critical place where DS is often fighting with kubelet that rejects pods.
- // We need to avoid hot looping and backoff.
- backoffKey := failedPodsBackoffKey(ds, node.Name)
- now := dsc.failedPodsBackoff.Clock.Now()
- inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
- if inBackoff {
- delay := dsc.failedPodsBackoff.Get(backoffKey)
- klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
- pod.Namespace, pod.Name, node.Name, delay)
- dsc.enqueueDaemonSetAfter(ds, delay)
- continue
- }
- dsc.failedPodsBackoff.Next(backoffKey, now)
- msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
- klog.V(2).Infof(msg)
- // Emit an event so that it's discoverable to users.
- dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
- podsToDelete = append(podsToDelete, pod.Name)
- } else {
- daemonPodsRunning = append(daemonPodsRunning, pod)
- }
- }
- // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
- // Sort the daemon pods by creation time, so the oldest is preserved.
- if len(daemonPodsRunning) > 1 {
- sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
- for i := 1; i < len(daemonPodsRunning); i++ {
- podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
- }
- }
- case !shouldContinueRunning && exists:
- // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
- for _, pod := range daemonPods {
- if pod.DeletionTimestamp != nil {
- continue
- }
- podsToDelete = append(podsToDelete, pod.Name)
- }
- }
- return nodesNeedingDaemonPods, podsToDelete, nil
- }
- // manage manages the scheduling and running of Pods of ds on nodes.
- // After figuring out which nodes should run a Pod of ds but not yet running one and
- // which nodes should not run a Pod of ds but currently running one, it calls function
- // syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
- func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
- // Find out the pods which are created for the nodes by DaemonSet.
- nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
- if err != nil {
- return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
- }
- // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
- // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
- var nodesNeedingDaemonPods, podsToDelete []string
- for _, node := range nodeList {
- nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
- node, nodeToDaemonPods, ds)
- if err != nil {
- continue
- }
- nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
- podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
- }
- // Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
- // If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
- podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
- // Label new pods using the hash label value of the current history when creating them
- if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
- return err
- }
- return nil
- }
- // syncNodes deletes given pods and creates new daemon set pods on the given nodes
- // returns slice with errors if any
- func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
- // We need to set expectations before creating/deleting pods to avoid race conditions.
- dsKey, err := controller.KeyFunc(ds)
- if err != nil {
- return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
- }
- createDiff := len(nodesNeedingDaemonPods)
- deleteDiff := len(podsToDelete)
- if createDiff > dsc.burstReplicas {
- createDiff = dsc.burstReplicas
- }
- if deleteDiff > dsc.burstReplicas {
- deleteDiff = dsc.burstReplicas
- }
- dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
- // error channel to communicate back failures. make the buffer big enough to avoid any blocking
- errCh := make(chan error, createDiff+deleteDiff)
- klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
- createWait := sync.WaitGroup{}
- // If the returned error is not nil we have a parse error.
- // The controller handles this via the hash.
- generation, err := util.GetTemplateGeneration(ds)
- if err != nil {
- generation = nil
- }
- template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
- // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
- // and double with each successful iteration in a kind of "slow start".
- // This handles attempts to start large numbers of pods that would
- // likely all fail with the same error. For example a project with a
- // low quota that attempts to create a large number of pods will be
- // prevented from spamming the API service with the pod create requests
- // after one of its pods fails. Conveniently, this also prevents the
- // event spam that those failures would generate.
- batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
- for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
- errorCount := len(errCh)
- createWait.Add(batchSize)
- for i := pos; i < pos+batchSize; i++ {
- go func(ix int) {
- defer createWait.Done()
- podTemplate := template.DeepCopy()
- // The pod's NodeAffinity will be updated to make sure the Pod is bound
- // to the target node by default scheduler. It is safe to do so because there
- // should be no conflicting node affinity with the target node.
- podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
- podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
- err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
- ds, metav1.NewControllerRef(ds, controllerKind))
- if err != nil {
- if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
- // If the namespace is being torn down, we can safely ignore
- // this error since all subsequent creations will fail.
- return
- }
- }
- if err != nil {
- klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
- dsc.expectations.CreationObserved(dsKey)
- errCh <- err
- utilruntime.HandleError(err)
- }
- }(i)
- }
- createWait.Wait()
- // any skipped pods that we never attempted to start shouldn't be expected.
- skippedPods := createDiff - (batchSize + pos)
- if errorCount < len(errCh) && skippedPods > 0 {
- klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
- dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
- // The skipped pods will be retried later. The next controller resync will
- // retry the slow start process.
- break
- }
- }
- klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
- deleteWait := sync.WaitGroup{}
- deleteWait.Add(deleteDiff)
- for i := 0; i < deleteDiff; i++ {
- go func(ix int) {
- defer deleteWait.Done()
- if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
- klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
- dsc.expectations.DeletionObserved(dsKey)
- errCh <- err
- utilruntime.HandleError(err)
- }
- }(i)
- }
- deleteWait.Wait()
- // collect errors if any for proper reporting/retry logic in the controller
- errors := []error{}
- close(errCh)
- for err := range errCh {
- errors = append(errors, err)
- }
- return utilerrors.NewAggregate(errors)
- }
- func storeDaemonSetStatus(dsClient unversionedapps.DaemonSetInterface, ds *apps.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int, updateObservedGen bool) error {
- if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
- int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
- int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
- int(ds.Status.NumberReady) == numberReady &&
- int(ds.Status.UpdatedNumberScheduled) == updatedNumberScheduled &&
- int(ds.Status.NumberAvailable) == numberAvailable &&
- int(ds.Status.NumberUnavailable) == numberUnavailable &&
- ds.Status.ObservedGeneration >= ds.Generation {
- return nil
- }
- toUpdate := ds.DeepCopy()
- var updateErr, getErr error
- for i := 0; i < StatusUpdateRetries; i++ {
- if updateObservedGen {
- toUpdate.Status.ObservedGeneration = ds.Generation
- }
- toUpdate.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
- toUpdate.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
- toUpdate.Status.NumberMisscheduled = int32(numberMisscheduled)
- toUpdate.Status.NumberReady = int32(numberReady)
- toUpdate.Status.UpdatedNumberScheduled = int32(updatedNumberScheduled)
- toUpdate.Status.NumberAvailable = int32(numberAvailable)
- toUpdate.Status.NumberUnavailable = int32(numberUnavailable)
- if _, updateErr = dsClient.UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{}); updateErr == nil {
- return nil
- }
- // Update the set with the latest resource version for the next poll
- if toUpdate, getErr = dsClient.Get(context.TODO(), ds.Name, metav1.GetOptions{}); getErr != nil {
- // If the GET fails we can't trust status.Replicas anymore. This error
- // is bound to be more interesting than the update failure.
- return getErr
- }
- }
- return updateErr
- }
- func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
- klog.V(4).Infof("Updating daemon set status")
- nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
- if err != nil {
- return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
- }
- var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
- for _, node := range nodeList {
- shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
- if err != nil {
- return err
- }
- scheduled := len(nodeToDaemonPods[node.Name]) > 0
- if shouldRun {
- desiredNumberScheduled++
- if scheduled {
- currentNumberScheduled++
- // Sort the daemon pods by creation time, so that the oldest is first.
- daemonPods, _ := nodeToDaemonPods[node.Name]
- sort.Sort(podByCreationTimestampAndPhase(daemonPods))
- pod := daemonPods[0]
- if podutil.IsPodReady(pod) {
- numberReady++
- if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
- numberAvailable++
- }
- }
- // If the returned error is not nil we have a parse error.
- // The controller handles this via the hash.
- generation, err := util.GetTemplateGeneration(ds)
- if err != nil {
- generation = nil
- }
- if util.IsPodUpdated(pod, hash, generation) {
- updatedNumberScheduled++
- }
- }
- } else {
- if scheduled {
- numberMisscheduled++
- }
- }
- }
- numberUnavailable := desiredNumberScheduled - numberAvailable
- err = storeDaemonSetStatus(dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
- if err != nil {
- return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
- }
- // Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew.
- if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
- dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
- }
- return nil
- }
- func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
- startTime := time.Now()
- defer func() {
- klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
- }()
- namespace, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- return err
- }
- ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
- if errors.IsNotFound(err) {
- klog.V(3).Infof("daemon set has been deleted %v", key)
- dsc.expectations.DeleteExpectations(key)
- return nil
- }
- if err != nil {
- return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
- }
- nodeList, err := dsc.nodeLister.List(labels.Everything())
- if err != nil {
- return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
- }
- everything := metav1.LabelSelector{}
- if reflect.DeepEqual(ds.Spec.Selector, &everything) {
- dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
- return nil
- }
- // Don't process a daemon set until all its creations and deletions have been processed.
- // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
- // then we do not want to call manage on foo until the daemon pods have been created.
- dsKey, err := controller.KeyFunc(ds)
- if err != nil {
- return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
- }
- // If the DaemonSet is being deleted (either by foreground deletion or
- // orphan deletion), we cannot be sure if the DaemonSet history objects
- // it owned still exist -- those history objects can either be deleted
- // or orphaned. Garbage collector doesn't guarantee that it will delete
- // DaemonSet pods before deleting DaemonSet history objects, because
- // DaemonSet history doesn't own DaemonSet pods. We cannot reliably
- // calculate the status of a DaemonSet being deleted. Therefore, return
- // here without updating status for the DaemonSet being deleted.
- if ds.DeletionTimestamp != nil {
- return nil
- }
- // Construct histories of the DaemonSet, and get the hash of current history
- cur, old, err := dsc.constructHistory(ds)
- if err != nil {
- return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
- }
- hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
- if !dsc.expectations.SatisfiedExpectations(dsKey) {
- // Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
- return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
- }
- err = dsc.manage(ds, nodeList, hash)
- if err != nil {
- return err
- }
- // Process rolling updates if we're ready.
- if dsc.expectations.SatisfiedExpectations(dsKey) {
- switch ds.Spec.UpdateStrategy.Type {
- case apps.OnDeleteDaemonSetStrategyType:
- case apps.RollingUpdateDaemonSetStrategyType:
- err = dsc.rollingUpdate(ds, nodeList, hash)
- }
- if err != nil {
- return err
- }
- }
- err = dsc.cleanupHistory(ds, old)
- if err != nil {
- return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
- }
- return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
- }
- // nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
- // summary. Returned booleans are:
- // * shouldRun:
- // Returns true when a daemonset should run on the node if a daemonset pod is not already
- // running on that node.
- // * shouldContinueRunning:
- // Returns true when a daemonset should continue running on a node if a daemonset pod is already
- // running on that node.
- func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool, error) {
- pod := NewPod(ds, node.Name)
- // If the daemon set specifies a node name, check that it matches with node.Name.
- if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
- return false, false, nil
- }
- nodeInfo := schedulernodeinfo.NewNodeInfo()
- nodeInfo.SetNode(node)
- taints, err := nodeInfo.Taints()
- if err != nil {
- klog.Warningf("failed to get node %q taints: %v", node.Name, err)
- return false, false, err
- }
- fitsNodeName, fitsNodeAffinity, fitsTaints := Predicates(pod, node, taints)
- if !fitsNodeName || !fitsNodeAffinity {
- return false, false, nil
- }
- if !fitsTaints {
- // Scheduled daemon pods should continue running if they tolerate NoExecute taint.
- shouldContinueRunning := v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, func(t *v1.Taint) bool {
- return t.Effect == v1.TaintEffectNoExecute
- })
- return false, shouldContinueRunning, nil
- }
- return true, true, nil
- }
- // Predicates checks if a DaemonSet's pod can run on a node.
- func Predicates(pod *v1.Pod, node *v1.Node, taints []v1.Taint) (fitsNodeName, fitsNodeAffinity, fitsTaints bool) {
- fitsNodeName = len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == node.Name
- fitsNodeAffinity = pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node)
- fitsTaints = v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, func(t *v1.Taint) bool {
- return t.Effect == v1.TaintEffectNoExecute || t.Effect == v1.TaintEffectNoSchedule
- })
- return
- }
- // NewPod creates a new pod
- func NewPod(ds *apps.DaemonSet, nodeName string) *v1.Pod {
- newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
- newPod.Namespace = ds.Namespace
- newPod.Spec.NodeName = nodeName
- // Added default tolerations for DaemonSet pods.
- util.AddOrUpdateDaemonPodTolerations(&newPod.Spec)
- return newPod
- }
- type podByCreationTimestampAndPhase []*v1.Pod
- func (o podByCreationTimestampAndPhase) Len() int { return len(o) }
- func (o podByCreationTimestampAndPhase) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
- func (o podByCreationTimestampAndPhase) Less(i, j int) bool {
- // Scheduled Pod first
- if len(o[i].Spec.NodeName) != 0 && len(o[j].Spec.NodeName) == 0 {
- return true
- }
- if len(o[i].Spec.NodeName) == 0 && len(o[j].Spec.NodeName) != 0 {
- return false
- }
- if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
- return o[i].Name < o[j].Name
- }
- return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
- }
- func failedPodsBackoffKey(ds *apps.DaemonSet, nodeName string) string {
- return fmt.Sprintf("%s/%d/%s", ds.UID, ds.Status.ObservedGeneration, nodeName)
- }
- // getUnscheduledPodsWithoutNode returns list of unscheduled pods assigned to not existing nodes.
- // Returned pods can't be deleted by PodGCController so they should be deleted by DaemonSetController.
- func getUnscheduledPodsWithoutNode(runningNodesList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) []string {
- var results []string
- isNodeRunning := make(map[string]bool)
- for _, node := range runningNodesList {
- isNodeRunning[node.Name] = true
- }
- for n, pods := range nodeToDaemonPods {
- if !isNodeRunning[n] {
- for _, pod := range pods {
- if len(pod.Spec.NodeName) == 0 {
- results = append(results, pod.Name)
- }
- }
- }
- }
- return results
- }
|