12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562 |
- package daemon
- import (
- "fmt"
- "reflect"
- "sort"
- "sync"
- "time"
- "k8s.io/klog"
- apps "k8s.io/api/apps/v1"
- "k8s.io/api/core/v1"
- apiequality "k8s.io/apimachinery/pkg/api/equality"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- appsinformers "k8s.io/client-go/informers/apps/v1"
- coreinformers "k8s.io/client-go/informers/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
- unversionedapps "k8s.io/client-go/kubernetes/typed/apps/v1"
- v1core "k8s.io/client-go/kubernetes/typed/core/v1"
- appslisters "k8s.io/client-go/listers/apps/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
- "k8s.io/client-go/util/flowcontrol"
- "k8s.io/client-go/util/workqueue"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/daemon/util"
- "k8s.io/kubernetes/pkg/features"
- kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
- "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
- schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
- "k8s.io/kubernetes/pkg/util/metrics"
- "k8s.io/utils/integer"
- )
- const (
-
-
- BurstReplicas = 250
-
- StatusUpdateRetries = 1
-
- BackoffGCInterval = 1 * time.Minute
- )
- const (
-
- SelectingAllReason = "SelectingAll"
-
- FailedPlacementReason = "FailedPlacement"
-
- FailedDaemonPodReason = "FailedDaemonPod"
- )
- var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet")
- type DaemonSetsController struct {
- kubeClient clientset.Interface
- eventRecorder record.EventRecorder
- podControl controller.PodControlInterface
- crControl controller.ControllerRevisionControlInterface
-
-
- burstReplicas int
-
- syncHandler func(dsKey string) error
- // used for unit testing
- enqueueDaemonSet func(ds *apps.DaemonSet)
- enqueueDaemonSetRateLimited func(ds *apps.DaemonSet)
- // A TTLCache of pod creates/deletes each ds expects to see
- expectations controller.ControllerExpectationsInterface
- // dsLister can list/get daemonsets from the shared informer's store
- dsLister appslisters.DaemonSetLister
- // dsStoreSynced returns true if the daemonset store has been synced at least once.
- // Added as a member to the struct to allow injection for testing.
- dsStoreSynced cache.InformerSynced
- // historyLister get list/get history from the shared informers's store
- historyLister appslisters.ControllerRevisionLister
- // historyStoreSynced returns true if the history store has been synced at least once.
- // Added as a member to the struct to allow injection for testing.
- historyStoreSynced cache.InformerSynced
- // podLister get list/get pods from the shared informers's store
- podLister corelisters.PodLister
- // podNodeIndex indexes pods by their nodeName
- podNodeIndex cache.Indexer
- // podStoreSynced returns true if the pod store has been synced at least once.
- // Added as a member to the struct to allow injection for testing.
- podStoreSynced cache.InformerSynced
- // nodeLister can list/get nodes from the shared informer's store
- nodeLister corelisters.NodeLister
- // nodeStoreSynced returns true if the node store has been synced at least once.
- // Added as a member to the struct to allow injection for testing.
- nodeStoreSynced cache.InformerSynced
- // DaemonSet keys that need to be synced.
- queue workqueue.RateLimitingInterface
- // The DaemonSet that has suspended pods on nodes; the key is node name, the value
- // is DaemonSet set that want to run pods but can't schedule in latest syncup cycle.
- suspendedDaemonPodsMutex sync.Mutex
- suspendedDaemonPods map[string]sets.String
- failedPodsBackoff *flowcontrol.Backoff
- }
- // NewDaemonSetsController creates a new DaemonSetsController
- func NewDaemonSetsController(
- daemonSetInformer appsinformers.DaemonSetInformer,
- historyInformer appsinformers.ControllerRevisionInformer,
- podInformer coreinformers.PodInformer,
- nodeInformer coreinformers.NodeInformer,
- kubeClient clientset.Interface,
- failedPodsBackoff *flowcontrol.Backoff,
- ) (*DaemonSetsController, error) {
- eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartLogging(klog.Infof)
- eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
- if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
- if err := metrics.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
- return nil, err
- }
- }
- dsc := &DaemonSetsController{
- kubeClient: kubeClient,
- eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
- podControl: controller.RealPodControl{
- KubeClient: kubeClient,
- Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
- },
- crControl: controller.RealControllerRevisionControl{
- KubeClient: kubeClient,
- },
- burstReplicas: BurstReplicas,
- expectations: controller.NewControllerExpectations(),
- queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
- suspendedDaemonPods: map[string]sets.String{},
- }
- daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- ds := obj.(*apps.DaemonSet)
- klog.V(4).Infof("Adding daemon set %s", ds.Name)
- dsc.enqueueDaemonSet(ds)
- },
- UpdateFunc: func(old, cur interface{}) {
- oldDS := old.(*apps.DaemonSet)
- curDS := cur.(*apps.DaemonSet)
- klog.V(4).Infof("Updating daemon set %s", oldDS.Name)
- dsc.enqueueDaemonSet(curDS)
- },
- DeleteFunc: dsc.deleteDaemonset,
- })
- dsc.dsLister = daemonSetInformer.Lister()
- dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
- historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: dsc.addHistory,
- UpdateFunc: dsc.updateHistory,
- DeleteFunc: dsc.deleteHistory,
- })
- dsc.historyLister = historyInformer.Lister()
- dsc.historyStoreSynced = historyInformer.Informer().HasSynced
-
-
- podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: dsc.addPod,
- UpdateFunc: dsc.updatePod,
- DeleteFunc: dsc.deletePod,
- })
- dsc.podLister = podInformer.Lister()
-
- podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
- "nodeName": indexByPodNodeName,
- })
- dsc.podNodeIndex = podInformer.Informer().GetIndexer()
- dsc.podStoreSynced = podInformer.Informer().HasSynced
- nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: dsc.addNode,
- UpdateFunc: dsc.updateNode,
- },
- )
- dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
- dsc.nodeLister = nodeInformer.Lister()
- dsc.syncHandler = dsc.syncDaemonSet
- dsc.enqueueDaemonSet = dsc.enqueue
- dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited
- dsc.failedPodsBackoff = failedPodsBackoff
- return dsc, nil
- }
- func indexByPodNodeName(obj interface{}) ([]string, error) {
- pod, ok := obj.(*v1.Pod)
- if !ok {
- return []string{}, nil
- }
-
- if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
- return []string{}, nil
- }
- return []string{pod.Spec.NodeName}, nil
- }
- func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
- ds, ok := obj.(*apps.DaemonSet)
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
- return
- }
- ds, ok = tombstone.Obj.(*apps.DaemonSet)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj))
- return
- }
- }
- klog.V(4).Infof("Deleting daemon set %s", ds.Name)
- dsc.enqueueDaemonSet(ds)
- }
- func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- defer dsc.queue.ShutDown()
- klog.Infof("Starting daemon sets controller")
- defer klog.Infof("Shutting down daemon sets controller")
- if !controller.WaitForCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
- return
- }
- for i := 0; i < workers; i++ {
- go wait.Until(dsc.runWorker, time.Second, stopCh)
- }
- go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)
- <-stopCh
- }
- func (dsc *DaemonSetsController) runWorker() {
- for dsc.processNextWorkItem() {
- }
- }
- func (dsc *DaemonSetsController) processNextWorkItem() bool {
- dsKey, quit := dsc.queue.Get()
- if quit {
- return false
- }
- defer dsc.queue.Done(dsKey)
- err := dsc.syncHandler(dsKey.(string))
- if err == nil {
- dsc.queue.Forget(dsKey)
- return true
- }
- utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
- dsc.queue.AddRateLimited(dsKey)
- return true
- }
- func (dsc *DaemonSetsController) enqueue(ds *apps.DaemonSet) {
- key, err := controller.KeyFunc(ds)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
- return
- }
-
- dsc.queue.Add(key)
- }
- func (dsc *DaemonSetsController) enqueueRateLimited(ds *apps.DaemonSet) {
- key, err := controller.KeyFunc(ds)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
- return
- }
- dsc.queue.AddRateLimited(key)
- }
- func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) {
- key, err := controller.KeyFunc(obj)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
- return
- }
-
- dsc.queue.AddAfter(key, after)
- }
- func (dsc *DaemonSetsController) getDaemonSetsForPod(pod *v1.Pod) []*apps.DaemonSet {
- sets, err := dsc.dsLister.GetPodDaemonSets(pod)
- if err != nil {
- return nil
- }
- if len(sets) > 1 {
-
-
- utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
- }
- return sets
- }
- func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.ControllerRevision) []*apps.DaemonSet {
- daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history)
- if err != nil || len(daemonSets) == 0 {
- return nil
- }
- if len(daemonSets) > 1 {
-
-
- klog.V(4).Infof("User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v",
- history.Namespace, history.Name, history.Labels)
- }
- return daemonSets
- }
- func (dsc *DaemonSetsController) addHistory(obj interface{}) {
- history := obj.(*apps.ControllerRevision)
- if history.DeletionTimestamp != nil {
-
-
- dsc.deleteHistory(history)
- return
- }
-
- if controllerRef := metav1.GetControllerOf(history); controllerRef != nil {
- ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
- if ds == nil {
- return
- }
- klog.V(4).Infof("ControllerRevision %s added.", history.Name)
- return
- }
-
-
- daemonSets := dsc.getDaemonSetsForHistory(history)
- if len(daemonSets) == 0 {
- return
- }
- klog.V(4).Infof("Orphan ControllerRevision %s added.", history.Name)
- for _, ds := range daemonSets {
- dsc.enqueueDaemonSet(ds)
- }
- }
- func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
- curHistory := cur.(*apps.ControllerRevision)
- oldHistory := old.(*apps.ControllerRevision)
- if curHistory.ResourceVersion == oldHistory.ResourceVersion {
-
- return
- }
- curControllerRef := metav1.GetControllerOf(curHistory)
- oldControllerRef := metav1.GetControllerOf(oldHistory)
- controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
- if controllerRefChanged && oldControllerRef != nil {
-
- if ds := dsc.resolveControllerRef(oldHistory.Namespace, oldControllerRef); ds != nil {
- dsc.enqueueDaemonSet(ds)
- }
- }
-
- if curControllerRef != nil {
- ds := dsc.resolveControllerRef(curHistory.Namespace, curControllerRef)
- if ds == nil {
- return
- }
- klog.V(4).Infof("ControllerRevision %s updated.", curHistory.Name)
- dsc.enqueueDaemonSet(ds)
- return
- }
-
-
- labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels)
- if labelChanged || controllerRefChanged {
- daemonSets := dsc.getDaemonSetsForHistory(curHistory)
- if len(daemonSets) == 0 {
- return
- }
- klog.V(4).Infof("Orphan ControllerRevision %s updated.", curHistory.Name)
- for _, ds := range daemonSets {
- dsc.enqueueDaemonSet(ds)
- }
- }
- }
- func (dsc *DaemonSetsController) deleteHistory(obj interface{}) {
- history, ok := obj.(*apps.ControllerRevision)
-
-
-
-
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
- return
- }
- history, ok = tombstone.Obj.(*apps.ControllerRevision)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ControllerRevision %#v", obj))
- return
- }
- }
- controllerRef := metav1.GetControllerOf(history)
- if controllerRef == nil {
-
- return
- }
- ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
- if ds == nil {
- return
- }
- klog.V(4).Infof("ControllerRevision %s deleted.", history.Name)
- dsc.enqueueDaemonSet(ds)
- }
- func (dsc *DaemonSetsController) addPod(obj interface{}) {
- pod := obj.(*v1.Pod)
- if pod.DeletionTimestamp != nil {
-
-
- dsc.deletePod(pod)
- return
- }
-
- if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
- ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
- if ds == nil {
- return
- }
- dsKey, err := controller.KeyFunc(ds)
- if err != nil {
- return
- }
- klog.V(4).Infof("Pod %s added.", pod.Name)
- dsc.expectations.CreationObserved(dsKey)
- dsc.enqueueDaemonSet(ds)
- return
- }
-
-
-
-
- dss := dsc.getDaemonSetsForPod(pod)
- if len(dss) == 0 {
- return
- }
- klog.V(4).Infof("Orphan Pod %s added.", pod.Name)
- for _, ds := range dss {
- dsc.enqueueDaemonSet(ds)
- }
- }
- func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
- curPod := cur.(*v1.Pod)
- oldPod := old.(*v1.Pod)
- if curPod.ResourceVersion == oldPod.ResourceVersion {
-
-
- return
- }
- if curPod.DeletionTimestamp != nil {
-
-
-
-
- dsc.deletePod(curPod)
- return
- }
- curControllerRef := metav1.GetControllerOf(curPod)
- oldControllerRef := metav1.GetControllerOf(oldPod)
- controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
- if controllerRefChanged && oldControllerRef != nil {
-
- if ds := dsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); ds != nil {
- dsc.enqueueDaemonSet(ds)
- }
- }
-
- if curControllerRef != nil {
- ds := dsc.resolveControllerRef(curPod.Namespace, curControllerRef)
- if ds == nil {
- return
- }
- klog.V(4).Infof("Pod %s updated.", curPod.Name)
- dsc.enqueueDaemonSet(ds)
- changedToReady := !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod)
-
- if changedToReady && ds.Spec.MinReadySeconds > 0 {
-
-
- dsc.enqueueDaemonSetAfter(ds, (time.Duration(ds.Spec.MinReadySeconds)*time.Second)+time.Second)
- }
- return
- }
-
-
- dss := dsc.getDaemonSetsForPod(curPod)
- if len(dss) == 0 {
- return
- }
- klog.V(4).Infof("Orphan Pod %s updated.", curPod.Name)
- labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
- if labelChanged || controllerRefChanged {
- for _, ds := range dss {
- dsc.enqueueDaemonSet(ds)
- }
- }
- }
- func (dsc *DaemonSetsController) listSuspendedDaemonPods(node string) (dss []string) {
- dsc.suspendedDaemonPodsMutex.Lock()
- defer dsc.suspendedDaemonPodsMutex.Unlock()
- if _, found := dsc.suspendedDaemonPods[node]; !found {
- return nil
- }
- for k := range dsc.suspendedDaemonPods[node] {
- dss = append(dss, k)
- }
- return
- }
- func (dsc *DaemonSetsController) requeueSuspendedDaemonPods(node string) {
- dss := dsc.listSuspendedDaemonPods(node)
- for _, dsKey := range dss {
- if ns, name, err := cache.SplitMetaNamespaceKey(dsKey); err != nil {
- klog.Errorf("Failed to get DaemonSet's namespace and name from %s: %v", dsKey, err)
- continue
- } else if ds, err := dsc.dsLister.DaemonSets(ns).Get(name); err != nil {
- klog.Errorf("Failed to get DaemonSet %s/%s: %v", ns, name, err)
- continue
- } else {
- dsc.enqueueDaemonSetRateLimited(ds)
- }
- }
- }
- func (dsc *DaemonSetsController) addSuspendedDaemonPods(node, ds string) {
- dsc.suspendedDaemonPodsMutex.Lock()
- defer dsc.suspendedDaemonPodsMutex.Unlock()
- if _, found := dsc.suspendedDaemonPods[node]; !found {
- dsc.suspendedDaemonPods[node] = sets.NewString()
- }
- dsc.suspendedDaemonPods[node].Insert(ds)
- }
- func (dsc *DaemonSetsController) removeSuspendedDaemonPods(node, ds string) {
- dsc.suspendedDaemonPodsMutex.Lock()
- defer dsc.suspendedDaemonPodsMutex.Unlock()
- if _, found := dsc.suspendedDaemonPods[node]; !found {
- return
- }
- dsc.suspendedDaemonPods[node].Delete(ds)
- if len(dsc.suspendedDaemonPods[node]) == 0 {
- delete(dsc.suspendedDaemonPods, node)
- }
- }
- func (dsc *DaemonSetsController) deletePod(obj interface{}) {
- pod, ok := obj.(*v1.Pod)
-
-
-
-
-
- if !ok {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
- return
- }
- pod, ok = tombstone.Obj.(*v1.Pod)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
- return
- }
- }
- controllerRef := metav1.GetControllerOf(pod)
- if controllerRef == nil {
-
- if len(pod.Spec.NodeName) != 0 {
-
- dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName)
- }
- return
- }
- ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
- if ds == nil {
- if len(pod.Spec.NodeName) != 0 {
-
- dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName)
- }
- return
- }
- dsKey, err := controller.KeyFunc(ds)
- if err != nil {
- return
- }
- klog.V(4).Infof("Pod %s deleted.", pod.Name)
- dsc.expectations.DeletionObserved(dsKey)
- dsc.enqueueDaemonSet(ds)
- }
- func (dsc *DaemonSetsController) addNode(obj interface{}) {
-
- dsList, err := dsc.dsLister.List(labels.Everything())
- if err != nil {
- klog.V(4).Infof("Error enqueueing daemon sets: %v", err)
- return
- }
- node := obj.(*v1.Node)
- for _, ds := range dsList {
- _, shouldSchedule, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
- if err != nil {
- continue
- }
- if shouldSchedule {
- dsc.enqueueDaemonSet(ds)
- }
- }
- }
- func nodeInSameCondition(old []v1.NodeCondition, cur []v1.NodeCondition) bool {
- if len(old) == 0 && len(cur) == 0 {
- return true
- }
- c1map := map[v1.NodeConditionType]v1.ConditionStatus{}
- for _, c := range old {
- if c.Status == v1.ConditionTrue {
- c1map[c.Type] = c.Status
- }
- }
- for _, c := range cur {
- if c.Status != v1.ConditionTrue {
- continue
- }
- if _, found := c1map[c.Type]; !found {
- return false
- }
- delete(c1map, c.Type)
- }
- return len(c1map) == 0
- }
- func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
- if !nodeInSameCondition(oldNode.Status.Conditions, curNode.Status.Conditions) {
- return false
- }
- oldNode.ResourceVersion = curNode.ResourceVersion
- oldNode.Status.Conditions = curNode.Status.Conditions
- return apiequality.Semantic.DeepEqual(oldNode, curNode)
- }
- func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
- oldNode := old.(*v1.Node)
- curNode := cur.(*v1.Node)
- if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
- return
- }
- dsList, err := dsc.dsLister.List(labels.Everything())
- if err != nil {
- klog.V(4).Infof("Error listing daemon sets: %v", err)
- return
- }
-
- for _, ds := range dsList {
- _, oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
- if err != nil {
- continue
- }
- _, currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
- if err != nil {
- continue
- }
- if (oldShouldSchedule != currentShouldSchedule) || (oldShouldContinueRunning != currentShouldContinueRunning) {
- dsc.enqueueDaemonSet(ds)
- }
- }
- }
- func (dsc *DaemonSetsController) getDaemonPods(ds *apps.DaemonSet) ([]*v1.Pod, error) {
- selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
- if err != nil {
- return nil, err
- }
-
-
- pods, err := dsc.podLister.Pods(ds.Namespace).List(labels.Everything())
- if err != nil {
- return nil, err
- }
-
-
- dsNotDeleted := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
- fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
- if err != nil {
- return nil, err
- }
- if fresh.UID != ds.UID {
- return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
- }
- return fresh, nil
- })
-
- cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, dsNotDeleted)
- return cm.ClaimPods(pods)
- }
- func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[string][]*v1.Pod, error) {
- claimedPods, err := dsc.getDaemonPods(ds)
- if err != nil {
- return nil, err
- }
-
- nodeToDaemonPods := make(map[string][]*v1.Pod)
- for _, pod := range claimedPods {
- nodeName, err := util.GetTargetNodeName(pod)
- if err != nil {
- klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
- pod.Namespace, pod.Name, ds.Namespace, ds.Name)
- continue
- }
- nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
- }
- return nodeToDaemonPods, nil
- }
- func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.DaemonSet {
-
-
- if controllerRef.Kind != controllerKind.Kind {
- return nil
- }
- ds, err := dsc.dsLister.DaemonSets(namespace).Get(controllerRef.Name)
- if err != nil {
- return nil
- }
- if ds.UID != controllerRef.UID {
-
-
- return nil
- }
- return ds
- }
- func (dsc *DaemonSetsController) podsShouldBeOnNode(
- node *v1.Node,
- nodeToDaemonPods map[string][]*v1.Pod,
- ds *apps.DaemonSet,
- ) (nodesNeedingDaemonPods, podsToDelete []string, failedPodsObserved int, err error) {
- wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
- if err != nil {
- return
- }
- daemonPods, exists := nodeToDaemonPods[node.Name]
- dsKey, _ := cache.MetaNamespaceKeyFunc(ds)
- dsc.removeSuspendedDaemonPods(node.Name, dsKey)
- switch {
- case wantToRun && !shouldSchedule:
-
- dsc.addSuspendedDaemonPods(node.Name, dsKey)
- case shouldSchedule && !exists:
-
- nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
- case shouldContinueRunning:
-
-
- var daemonPodsRunning []*v1.Pod
- for _, pod := range daemonPods {
- if pod.DeletionTimestamp != nil {
- continue
- }
- if pod.Status.Phase == v1.PodFailed {
- failedPodsObserved++
-
-
- backoffKey := failedPodsBackoffKey(ds, node.Name)
- now := dsc.failedPodsBackoff.Clock.Now()
- inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
- if inBackoff {
- delay := dsc.failedPodsBackoff.Get(backoffKey)
- klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
- pod.Namespace, pod.Name, node.Name, delay)
- dsc.enqueueDaemonSetAfter(ds, delay)
- continue
- }
- dsc.failedPodsBackoff.Next(backoffKey, now)
- msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
- klog.V(2).Infof(msg)
-
- dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
- podsToDelete = append(podsToDelete, pod.Name)
- } else {
- daemonPodsRunning = append(daemonPodsRunning, pod)
- }
- }
-
-
- if len(daemonPodsRunning) > 1 {
- sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
- for i := 1; i < len(daemonPodsRunning); i++ {
- podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
- }
- }
- case !shouldContinueRunning && exists:
-
- for _, pod := range daemonPods {
- podsToDelete = append(podsToDelete, pod.Name)
- }
- }
- return nodesNeedingDaemonPods, podsToDelete, failedPodsObserved, nil
- }
- func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
-
- nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
- if err != nil {
- return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
- }
-
-
- var nodesNeedingDaemonPods, podsToDelete []string
- var failedPodsObserved int
- for _, node := range nodeList {
- nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, failedPodsObservedOnNode, err := dsc.podsShouldBeOnNode(
- node, nodeToDaemonPods, ds)
- if err != nil {
- continue
- }
- nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
- podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
- failedPodsObserved += failedPodsObservedOnNode
- }
-
-
- if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
- podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
- }
-
- if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
- return err
- }
-
- if failedPodsObserved > 0 {
- return fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name)
- }
- return nil
- }
- func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
-
- dsKey, err := controller.KeyFunc(ds)
- if err != nil {
- return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
- }
- createDiff := len(nodesNeedingDaemonPods)
- deleteDiff := len(podsToDelete)
- if createDiff > dsc.burstReplicas {
- createDiff = dsc.burstReplicas
- }
- if deleteDiff > dsc.burstReplicas {
- deleteDiff = dsc.burstReplicas
- }
- dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
-
- errCh := make(chan error, createDiff+deleteDiff)
- klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
- createWait := sync.WaitGroup{}
-
-
- generation, err := util.GetTemplateGeneration(ds)
- if err != nil {
- generation = nil
- }
- template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
-
-
-
-
-
-
-
-
- batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
- for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
- errorCount := len(errCh)
- createWait.Add(batchSize)
- for i := pos; i < pos+batchSize; i++ {
- go func(ix int) {
- defer createWait.Done()
- var err error
- podTemplate := template.DeepCopy()
- if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
-
-
-
- podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
- podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
- err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
- ds, metav1.NewControllerRef(ds, controllerKind))
- } else {
-
- podTemplate.Spec.SchedulerName = "kubernetes.io/daemonset-controller"
- err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate,
- ds, metav1.NewControllerRef(ds, controllerKind))
- }
- if err != nil && errors.IsTimeout(err) {
-
-
-
-
-
-
-
- return
- }
- if err != nil {
- klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
- dsc.expectations.CreationObserved(dsKey)
- errCh <- err
- utilruntime.HandleError(err)
- }
- }(i)
- }
- createWait.Wait()
-
- skippedPods := createDiff - (batchSize + pos)
- if errorCount < len(errCh) && skippedPods > 0 {
- klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
- for i := 0; i < skippedPods; i++ {
- dsc.expectations.CreationObserved(dsKey)
- }
-
-
- break
- }
- }
- klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
- deleteWait := sync.WaitGroup{}
- deleteWait.Add(deleteDiff)
- for i := 0; i < deleteDiff; i++ {
- go func(ix int) {
- defer deleteWait.Done()
- if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
- klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
- dsc.expectations.DeletionObserved(dsKey)
- errCh <- err
- utilruntime.HandleError(err)
- }
- }(i)
- }
- deleteWait.Wait()
-
- errors := []error{}
- close(errCh)
- for err := range errCh {
- errors = append(errors, err)
- }
- return utilerrors.NewAggregate(errors)
- }
- func storeDaemonSetStatus(dsClient unversionedapps.DaemonSetInterface, ds *apps.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int, updateObservedGen bool) error {
- if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
- int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
- int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
- int(ds.Status.NumberReady) == numberReady &&
- int(ds.Status.UpdatedNumberScheduled) == updatedNumberScheduled &&
- int(ds.Status.NumberAvailable) == numberAvailable &&
- int(ds.Status.NumberUnavailable) == numberUnavailable &&
- ds.Status.ObservedGeneration >= ds.Generation {
- return nil
- }
- toUpdate := ds.DeepCopy()
- var updateErr, getErr error
- for i := 0; i < StatusUpdateRetries; i++ {
- if updateObservedGen {
- toUpdate.Status.ObservedGeneration = ds.Generation
- }
- toUpdate.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
- toUpdate.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
- toUpdate.Status.NumberMisscheduled = int32(numberMisscheduled)
- toUpdate.Status.NumberReady = int32(numberReady)
- toUpdate.Status.UpdatedNumberScheduled = int32(updatedNumberScheduled)
- toUpdate.Status.NumberAvailable = int32(numberAvailable)
- toUpdate.Status.NumberUnavailable = int32(numberUnavailable)
- if _, updateErr = dsClient.UpdateStatus(toUpdate); updateErr == nil {
- return nil
- }
-
- if toUpdate, getErr = dsClient.Get(ds.Name, metav1.GetOptions{}); getErr != nil {
-
-
- return getErr
- }
- }
- return updateErr
- }
- func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
- klog.V(4).Infof("Updating daemon set status")
- nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
- if err != nil {
- return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
- }
- var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
- for _, node := range nodeList {
- wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
- if err != nil {
- return err
- }
- scheduled := len(nodeToDaemonPods[node.Name]) > 0
- if wantToRun {
- desiredNumberScheduled++
- if scheduled {
- currentNumberScheduled++
-
- daemonPods, _ := nodeToDaemonPods[node.Name]
- sort.Sort(podByCreationTimestampAndPhase(daemonPods))
- pod := daemonPods[0]
- if podutil.IsPodReady(pod) {
- numberReady++
- if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
- numberAvailable++
- }
- }
-
-
- generation, err := util.GetTemplateGeneration(ds)
- if err != nil {
- generation = nil
- }
- if util.IsPodUpdated(pod, hash, generation) {
- updatedNumberScheduled++
- }
- }
- } else {
- if scheduled {
- numberMisscheduled++
- }
- }
- }
- numberUnavailable := desiredNumberScheduled - numberAvailable
- err = storeDaemonSetStatus(dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
- if err != nil {
- return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
- }
-
- if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
- dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
- }
- return nil
- }
- func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
- startTime := time.Now()
- defer func() {
- klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
- }()
- namespace, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- return err
- }
- ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
- if errors.IsNotFound(err) {
- klog.V(3).Infof("daemon set has been deleted %v", key)
- dsc.expectations.DeleteExpectations(key)
- return nil
- }
- if err != nil {
- return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
- }
- nodeList, err := dsc.nodeLister.List(labels.Everything())
- if err != nil {
- return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
- }
- everything := metav1.LabelSelector{}
- if reflect.DeepEqual(ds.Spec.Selector, &everything) {
- dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
- return nil
- }
-
-
-
- dsKey, err := controller.KeyFunc(ds)
- if err != nil {
- return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
- }
-
-
-
-
-
-
-
-
- if ds.DeletionTimestamp != nil {
- return nil
- }
-
- cur, old, err := dsc.constructHistory(ds)
- if err != nil {
- return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
- }
- hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
- if !dsc.expectations.SatisfiedExpectations(dsKey) {
-
- return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
- }
- err = dsc.manage(ds, nodeList, hash)
- if err != nil {
- return err
- }
-
- if dsc.expectations.SatisfiedExpectations(dsKey) {
- switch ds.Spec.UpdateStrategy.Type {
- case apps.OnDeleteDaemonSetStrategyType:
- case apps.RollingUpdateDaemonSetStrategyType:
- err = dsc.rollingUpdate(ds, nodeList, hash)
- }
- if err != nil {
- return err
- }
- }
- err = dsc.cleanupHistory(ds, old)
- if err != nil {
- return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
- }
- return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
- }
- func (dsc *DaemonSetsController) simulate(newPod *v1.Pod, node *v1.Node, ds *apps.DaemonSet) ([]predicates.PredicateFailureReason, *schedulernodeinfo.NodeInfo, error) {
- objects, err := dsc.podNodeIndex.ByIndex("nodeName", node.Name)
- if err != nil {
- return nil, nil, err
- }
- nodeInfo := schedulernodeinfo.NewNodeInfo()
- nodeInfo.SetNode(node)
- for _, obj := range objects {
-
- pod, ok := obj.(*v1.Pod)
- if !ok {
- continue
- }
- if metav1.IsControlledBy(pod, ds) {
- continue
- }
- nodeInfo.AddPod(pod)
- }
- _, reasons, err := Predicates(newPod, nodeInfo)
- return reasons, nodeInfo, err
- }
- func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
- newPod := NewPod(ds, node.Name)
-
-
-
- wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
-
- if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
- return false, false, false, nil
- }
- reasons, nodeInfo, err := dsc.simulate(newPod, node, ds)
- if err != nil {
- klog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err)
- return false, false, false, err
- }
-
-
-
- var insufficientResourceErr error
- for _, r := range reasons {
- klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
- switch reason := r.(type) {
- case *predicates.InsufficientResourceError:
- insufficientResourceErr = reason
- case *predicates.PredicateFailureError:
- var emitEvent bool
-
- switch reason {
-
- case
- predicates.ErrNodeSelectorNotMatch,
- predicates.ErrPodNotMatchHostName,
- predicates.ErrNodeLabelPresenceViolated,
-
-
- predicates.ErrPodNotFitsHostPorts:
- return false, false, false, nil
- case predicates.ErrTaintsTolerationsNotMatch:
-
- fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
- if err != nil {
- return false, false, false, err
- }
- if !fitsNoExecute {
- return false, false, false, nil
- }
- wantToRun, shouldSchedule = false, false
-
- case
- predicates.ErrDiskConflict,
- predicates.ErrVolumeZoneConflict,
- predicates.ErrMaxVolumeCountExceeded,
- predicates.ErrNodeUnderMemoryPressure,
- predicates.ErrNodeUnderDiskPressure:
-
-
-
- shouldSchedule = false
- emitEvent = true
-
- case
- predicates.ErrPodAffinityNotMatch,
- predicates.ErrServiceAffinityViolated:
- klog.Warningf("unexpected predicate failure reason: %s", reason.GetReason())
- return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
- default:
- klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason())
- wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
- emitEvent = true
- }
- if emitEvent {
- dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
- }
- }
- }
-
-
- if shouldSchedule && insufficientResourceErr != nil {
- dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error())
- shouldSchedule = false
- }
- return
- }
- func NewPod(ds *apps.DaemonSet, nodeName string) *v1.Pod {
- newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
- newPod.Namespace = ds.Namespace
- newPod.Spec.NodeName = nodeName
-
- util.AddOrUpdateDaemonPodTolerations(&newPod.Spec)
- return newPod
- }
- func checkNodeFitness(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
- var predicateFails []predicates.PredicateFailureReason
- fit, reasons, err := predicates.PodFitsHost(pod, meta, nodeInfo)
- if err != nil {
- return false, predicateFails, err
- }
- if !fit {
- predicateFails = append(predicateFails, reasons...)
- }
- fit, reasons, err = predicates.PodMatchNodeSelector(pod, meta, nodeInfo)
- if err != nil {
- return false, predicateFails, err
- }
- if !fit {
- predicateFails = append(predicateFails, reasons...)
- }
- fit, reasons, err = predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
- if err != nil {
- return false, predicateFails, err
- }
- if !fit {
- predicateFails = append(predicateFails, reasons...)
- }
- return len(predicateFails) == 0, predicateFails, nil
- }
- func Predicates(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
- var predicateFails []predicates.PredicateFailureReason
-
- if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
- fit, reasons, err := checkNodeFitness(pod, nil, nodeInfo)
- if err != nil {
- return false, predicateFails, err
- }
- if !fit {
- predicateFails = append(predicateFails, reasons...)
- }
- return len(predicateFails) == 0, predicateFails, nil
- }
- critical := kubelettypes.IsCriticalPod(pod)
- fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
- if err != nil {
- return false, predicateFails, err
- }
- if !fit {
- predicateFails = append(predicateFails, reasons...)
- }
- if critical {
-
-
- fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo)
- } else {
- fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo)
- }
- if err != nil {
- return false, predicateFails, err
- }
- if !fit {
- predicateFails = append(predicateFails, reasons...)
- }
- return len(predicateFails) == 0, predicateFails, nil
- }
- type podByCreationTimestampAndPhase []*v1.Pod
- func (o podByCreationTimestampAndPhase) Len() int { return len(o) }
- func (o podByCreationTimestampAndPhase) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
- func (o podByCreationTimestampAndPhase) Less(i, j int) bool {
-
- if len(o[i].Spec.NodeName) != 0 && len(o[j].Spec.NodeName) == 0 {
- return true
- }
- if len(o[i].Spec.NodeName) == 0 && len(o[j].Spec.NodeName) != 0 {
- return false
- }
- if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
- return o[i].Name < o[j].Name
- }
- return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
- }
- func failedPodsBackoffKey(ds *apps.DaemonSet, nodeName string) string {
- return fmt.Sprintf("%s/%d/%s", ds.UID, ds.Status.ObservedGeneration, nodeName)
- }
- func getUnscheduledPodsWithoutNode(runningNodesList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) []string {
- var results []string
- isNodeRunning := make(map[string]bool)
- for _, node := range runningNodesList {
- isNodeRunning[node.Name] = true
- }
- for n, pods := range nodeToDaemonPods {
- if !isNodeRunning[n] {
- for _, pod := range pods {
- if len(pod.Spec.NodeName) == 0 {
- results = append(results, pod.Name)
- }
- }
- }
- }
- return results
- }
|