daemon_controller.go 57 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package daemon
  14. import (
  15. "fmt"
  16. "reflect"
  17. "sort"
  18. "sync"
  19. "time"
  20. "k8s.io/klog"
  21. apps "k8s.io/api/apps/v1"
  22. "k8s.io/api/core/v1"
  23. apiequality "k8s.io/apimachinery/pkg/api/equality"
  24. "k8s.io/apimachinery/pkg/api/errors"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/labels"
  27. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  28. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  29. "k8s.io/apimachinery/pkg/util/sets"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. utilfeature "k8s.io/apiserver/pkg/util/feature"
  32. appsinformers "k8s.io/client-go/informers/apps/v1"
  33. coreinformers "k8s.io/client-go/informers/core/v1"
  34. clientset "k8s.io/client-go/kubernetes"
  35. "k8s.io/client-go/kubernetes/scheme"
  36. unversionedapps "k8s.io/client-go/kubernetes/typed/apps/v1"
  37. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  38. appslisters "k8s.io/client-go/listers/apps/v1"
  39. corelisters "k8s.io/client-go/listers/core/v1"
  40. "k8s.io/client-go/tools/cache"
  41. "k8s.io/client-go/tools/record"
  42. "k8s.io/client-go/util/flowcontrol"
  43. "k8s.io/client-go/util/workqueue"
  44. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  45. "k8s.io/kubernetes/pkg/controller"
  46. "k8s.io/kubernetes/pkg/controller/daemon/util"
  47. "k8s.io/kubernetes/pkg/features"
  48. kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
  49. "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
  50. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  51. "k8s.io/kubernetes/pkg/util/metrics"
  52. "k8s.io/utils/integer"
  53. )
  54. const (
  55. // BurstReplicas is a rate limiter for booting pods on a lot of pods.
  56. // The value of 250 is chosen b/c values that are too high can cause registry DoS issues.
  57. BurstReplicas = 250
  58. // StatusUpdateRetries limits the number of retries if sending a status update to API server fails.
  59. StatusUpdateRetries = 1
  60. // BackoffGCInterval is the time that has to pass before next iteration of backoff GC is run
  61. BackoffGCInterval = 1 * time.Minute
  62. )
  63. // Reasons for DaemonSet events
  64. const (
  65. // SelectingAllReason is added to an event when a DaemonSet selects all Pods.
  66. SelectingAllReason = "SelectingAll"
  67. // FailedPlacementReason is added to an event when a DaemonSet can't schedule a Pod to a specified node.
  68. FailedPlacementReason = "FailedPlacement"
  69. // FailedDaemonPodReason is added to an event when the status of a Pod of a DaemonSet is 'Failed'.
  70. FailedDaemonPodReason = "FailedDaemonPod"
  71. )
  72. // controllerKind contains the schema.GroupVersionKind for this controller type.
  73. var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet")
  74. // DaemonSetsController is responsible for synchronizing DaemonSet objects stored
  75. // in the system with actual running pods.
  76. type DaemonSetsController struct {
  77. kubeClient clientset.Interface
  78. eventRecorder record.EventRecorder
  79. podControl controller.PodControlInterface
  80. crControl controller.ControllerRevisionControlInterface
  81. // An dsc is temporarily suspended after creating/deleting these many replicas.
  82. // It resumes normal action after observing the watch events for them.
  83. burstReplicas int
  84. // To allow injection of syncDaemonSet for testing.
  85. syncHandler func(dsKey string) error
  86. // used for unit testing
  87. enqueueDaemonSet func(ds *apps.DaemonSet)
  88. enqueueDaemonSetRateLimited func(ds *apps.DaemonSet)
  89. // A TTLCache of pod creates/deletes each ds expects to see
  90. expectations controller.ControllerExpectationsInterface
  91. // dsLister can list/get daemonsets from the shared informer's store
  92. dsLister appslisters.DaemonSetLister
  93. // dsStoreSynced returns true if the daemonset store has been synced at least once.
  94. // Added as a member to the struct to allow injection for testing.
  95. dsStoreSynced cache.InformerSynced
  96. // historyLister get list/get history from the shared informers's store
  97. historyLister appslisters.ControllerRevisionLister
  98. // historyStoreSynced returns true if the history store has been synced at least once.
  99. // Added as a member to the struct to allow injection for testing.
  100. historyStoreSynced cache.InformerSynced
  101. // podLister get list/get pods from the shared informers's store
  102. podLister corelisters.PodLister
  103. // podNodeIndex indexes pods by their nodeName
  104. podNodeIndex cache.Indexer
  105. // podStoreSynced returns true if the pod store has been synced at least once.
  106. // Added as a member to the struct to allow injection for testing.
  107. podStoreSynced cache.InformerSynced
  108. // nodeLister can list/get nodes from the shared informer's store
  109. nodeLister corelisters.NodeLister
  110. // nodeStoreSynced returns true if the node store has been synced at least once.
  111. // Added as a member to the struct to allow injection for testing.
  112. nodeStoreSynced cache.InformerSynced
  113. // DaemonSet keys that need to be synced.
  114. queue workqueue.RateLimitingInterface
  115. // The DaemonSet that has suspended pods on nodes; the key is node name, the value
  116. // is DaemonSet set that want to run pods but can't schedule in latest syncup cycle.
  117. suspendedDaemonPodsMutex sync.Mutex
  118. suspendedDaemonPods map[string]sets.String
  119. failedPodsBackoff *flowcontrol.Backoff
  120. }
  121. // NewDaemonSetsController creates a new DaemonSetsController
  122. func NewDaemonSetsController(
  123. daemonSetInformer appsinformers.DaemonSetInformer,
  124. historyInformer appsinformers.ControllerRevisionInformer,
  125. podInformer coreinformers.PodInformer,
  126. nodeInformer coreinformers.NodeInformer,
  127. kubeClient clientset.Interface,
  128. failedPodsBackoff *flowcontrol.Backoff,
  129. ) (*DaemonSetsController, error) {
  130. eventBroadcaster := record.NewBroadcaster()
  131. eventBroadcaster.StartLogging(klog.Infof)
  132. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  133. if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  134. if err := metrics.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  135. return nil, err
  136. }
  137. }
  138. dsc := &DaemonSetsController{
  139. kubeClient: kubeClient,
  140. eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
  141. podControl: controller.RealPodControl{
  142. KubeClient: kubeClient,
  143. Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
  144. },
  145. crControl: controller.RealControllerRevisionControl{
  146. KubeClient: kubeClient,
  147. },
  148. burstReplicas: BurstReplicas,
  149. expectations: controller.NewControllerExpectations(),
  150. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
  151. suspendedDaemonPods: map[string]sets.String{},
  152. }
  153. daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  154. AddFunc: func(obj interface{}) {
  155. ds := obj.(*apps.DaemonSet)
  156. klog.V(4).Infof("Adding daemon set %s", ds.Name)
  157. dsc.enqueueDaemonSet(ds)
  158. },
  159. UpdateFunc: func(old, cur interface{}) {
  160. oldDS := old.(*apps.DaemonSet)
  161. curDS := cur.(*apps.DaemonSet)
  162. klog.V(4).Infof("Updating daemon set %s", oldDS.Name)
  163. dsc.enqueueDaemonSet(curDS)
  164. },
  165. DeleteFunc: dsc.deleteDaemonset,
  166. })
  167. dsc.dsLister = daemonSetInformer.Lister()
  168. dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
  169. historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  170. AddFunc: dsc.addHistory,
  171. UpdateFunc: dsc.updateHistory,
  172. DeleteFunc: dsc.deleteHistory,
  173. })
  174. dsc.historyLister = historyInformer.Lister()
  175. dsc.historyStoreSynced = historyInformer.Informer().HasSynced
  176. // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
  177. // more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
  178. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  179. AddFunc: dsc.addPod,
  180. UpdateFunc: dsc.updatePod,
  181. DeleteFunc: dsc.deletePod,
  182. })
  183. dsc.podLister = podInformer.Lister()
  184. // This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
  185. podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
  186. "nodeName": indexByPodNodeName,
  187. })
  188. dsc.podNodeIndex = podInformer.Informer().GetIndexer()
  189. dsc.podStoreSynced = podInformer.Informer().HasSynced
  190. nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  191. AddFunc: dsc.addNode,
  192. UpdateFunc: dsc.updateNode,
  193. },
  194. )
  195. dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
  196. dsc.nodeLister = nodeInformer.Lister()
  197. dsc.syncHandler = dsc.syncDaemonSet
  198. dsc.enqueueDaemonSet = dsc.enqueue
  199. dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited
  200. dsc.failedPodsBackoff = failedPodsBackoff
  201. return dsc, nil
  202. }
  203. func indexByPodNodeName(obj interface{}) ([]string, error) {
  204. pod, ok := obj.(*v1.Pod)
  205. if !ok {
  206. return []string{}, nil
  207. }
  208. // We are only interested in active pods with nodeName set
  209. if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
  210. return []string{}, nil
  211. }
  212. return []string{pod.Spec.NodeName}, nil
  213. }
  214. func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
  215. ds, ok := obj.(*apps.DaemonSet)
  216. if !ok {
  217. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  218. if !ok {
  219. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  220. return
  221. }
  222. ds, ok = tombstone.Obj.(*apps.DaemonSet)
  223. if !ok {
  224. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj))
  225. return
  226. }
  227. }
  228. klog.V(4).Infof("Deleting daemon set %s", ds.Name)
  229. dsc.enqueueDaemonSet(ds)
  230. }
  231. // Run begins watching and syncing daemon sets.
  232. func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
  233. defer utilruntime.HandleCrash()
  234. defer dsc.queue.ShutDown()
  235. klog.Infof("Starting daemon sets controller")
  236. defer klog.Infof("Shutting down daemon sets controller")
  237. if !controller.WaitForCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
  238. return
  239. }
  240. for i := 0; i < workers; i++ {
  241. go wait.Until(dsc.runWorker, time.Second, stopCh)
  242. }
  243. go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)
  244. <-stopCh
  245. }
  246. func (dsc *DaemonSetsController) runWorker() {
  247. for dsc.processNextWorkItem() {
  248. }
  249. }
  250. // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
  251. func (dsc *DaemonSetsController) processNextWorkItem() bool {
  252. dsKey, quit := dsc.queue.Get()
  253. if quit {
  254. return false
  255. }
  256. defer dsc.queue.Done(dsKey)
  257. err := dsc.syncHandler(dsKey.(string))
  258. if err == nil {
  259. dsc.queue.Forget(dsKey)
  260. return true
  261. }
  262. utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
  263. dsc.queue.AddRateLimited(dsKey)
  264. return true
  265. }
  266. func (dsc *DaemonSetsController) enqueue(ds *apps.DaemonSet) {
  267. key, err := controller.KeyFunc(ds)
  268. if err != nil {
  269. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
  270. return
  271. }
  272. // TODO: Handle overlapping controllers better. See comment in ReplicationManager.
  273. dsc.queue.Add(key)
  274. }
  275. func (dsc *DaemonSetsController) enqueueRateLimited(ds *apps.DaemonSet) {
  276. key, err := controller.KeyFunc(ds)
  277. if err != nil {
  278. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
  279. return
  280. }
  281. dsc.queue.AddRateLimited(key)
  282. }
  283. func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) {
  284. key, err := controller.KeyFunc(obj)
  285. if err != nil {
  286. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  287. return
  288. }
  289. // TODO: Handle overlapping controllers better. See comment in ReplicationManager.
  290. dsc.queue.AddAfter(key, after)
  291. }
  292. // getDaemonSetsForPod returns a list of DaemonSets that potentially match the pod.
  293. func (dsc *DaemonSetsController) getDaemonSetsForPod(pod *v1.Pod) []*apps.DaemonSet {
  294. sets, err := dsc.dsLister.GetPodDaemonSets(pod)
  295. if err != nil {
  296. return nil
  297. }
  298. if len(sets) > 1 {
  299. // ControllerRef will ensure we don't do anything crazy, but more than one
  300. // item in this list nevertheless constitutes user error.
  301. utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
  302. }
  303. return sets
  304. }
  305. // getDaemonSetsForHistory returns a list of DaemonSets that potentially
  306. // match a ControllerRevision.
  307. func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.ControllerRevision) []*apps.DaemonSet {
  308. daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history)
  309. if err != nil || len(daemonSets) == 0 {
  310. return nil
  311. }
  312. if len(daemonSets) > 1 {
  313. // ControllerRef will ensure we don't do anything crazy, but more than one
  314. // item in this list nevertheless constitutes user error.
  315. klog.V(4).Infof("User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v",
  316. history.Namespace, history.Name, history.Labels)
  317. }
  318. return daemonSets
  319. }
  320. // addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created
  321. // or when the controller manager is restarted.
  322. func (dsc *DaemonSetsController) addHistory(obj interface{}) {
  323. history := obj.(*apps.ControllerRevision)
  324. if history.DeletionTimestamp != nil {
  325. // On a restart of the controller manager, it's possible for an object to
  326. // show up in a state that is already pending deletion.
  327. dsc.deleteHistory(history)
  328. return
  329. }
  330. // If it has a ControllerRef, that's all that matters.
  331. if controllerRef := metav1.GetControllerOf(history); controllerRef != nil {
  332. ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
  333. if ds == nil {
  334. return
  335. }
  336. klog.V(4).Infof("ControllerRevision %s added.", history.Name)
  337. return
  338. }
  339. // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
  340. // them to see if anyone wants to adopt it.
  341. daemonSets := dsc.getDaemonSetsForHistory(history)
  342. if len(daemonSets) == 0 {
  343. return
  344. }
  345. klog.V(4).Infof("Orphan ControllerRevision %s added.", history.Name)
  346. for _, ds := range daemonSets {
  347. dsc.enqueueDaemonSet(ds)
  348. }
  349. }
  350. // updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision
  351. // is updated and wake them up. If anything of the ControllerRevision has changed, we need to awaken
  352. // both the old and new DaemonSets.
  353. func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
  354. curHistory := cur.(*apps.ControllerRevision)
  355. oldHistory := old.(*apps.ControllerRevision)
  356. if curHistory.ResourceVersion == oldHistory.ResourceVersion {
  357. // Periodic resync will send update events for all known ControllerRevisions.
  358. return
  359. }
  360. curControllerRef := metav1.GetControllerOf(curHistory)
  361. oldControllerRef := metav1.GetControllerOf(oldHistory)
  362. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  363. if controllerRefChanged && oldControllerRef != nil {
  364. // The ControllerRef was changed. Sync the old controller, if any.
  365. if ds := dsc.resolveControllerRef(oldHistory.Namespace, oldControllerRef); ds != nil {
  366. dsc.enqueueDaemonSet(ds)
  367. }
  368. }
  369. // If it has a ControllerRef, that's all that matters.
  370. if curControllerRef != nil {
  371. ds := dsc.resolveControllerRef(curHistory.Namespace, curControllerRef)
  372. if ds == nil {
  373. return
  374. }
  375. klog.V(4).Infof("ControllerRevision %s updated.", curHistory.Name)
  376. dsc.enqueueDaemonSet(ds)
  377. return
  378. }
  379. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  380. // to see if anyone wants to adopt it now.
  381. labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels)
  382. if labelChanged || controllerRefChanged {
  383. daemonSets := dsc.getDaemonSetsForHistory(curHistory)
  384. if len(daemonSets) == 0 {
  385. return
  386. }
  387. klog.V(4).Infof("Orphan ControllerRevision %s updated.", curHistory.Name)
  388. for _, ds := range daemonSets {
  389. dsc.enqueueDaemonSet(ds)
  390. }
  391. }
  392. }
  393. // deleteHistory enqueues the DaemonSet that manages a ControllerRevision when
  394. // the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or
  395. // a DeletionFinalStateUnknown marker item.
  396. func (dsc *DaemonSetsController) deleteHistory(obj interface{}) {
  397. history, ok := obj.(*apps.ControllerRevision)
  398. // When a delete is dropped, the relist will notice a ControllerRevision in the store not
  399. // in the list, leading to the insertion of a tombstone object which contains
  400. // the deleted key/value. Note that this value might be stale. If the ControllerRevision
  401. // changed labels the new DaemonSet will not be woken up till the periodic resync.
  402. if !ok {
  403. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  404. if !ok {
  405. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  406. return
  407. }
  408. history, ok = tombstone.Obj.(*apps.ControllerRevision)
  409. if !ok {
  410. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ControllerRevision %#v", obj))
  411. return
  412. }
  413. }
  414. controllerRef := metav1.GetControllerOf(history)
  415. if controllerRef == nil {
  416. // No controller should care about orphans being deleted.
  417. return
  418. }
  419. ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
  420. if ds == nil {
  421. return
  422. }
  423. klog.V(4).Infof("ControllerRevision %s deleted.", history.Name)
  424. dsc.enqueueDaemonSet(ds)
  425. }
  426. func (dsc *DaemonSetsController) addPod(obj interface{}) {
  427. pod := obj.(*v1.Pod)
  428. if pod.DeletionTimestamp != nil {
  429. // on a restart of the controller manager, it's possible a new pod shows up in a state that
  430. // is already pending deletion. Prevent the pod from being a creation observation.
  431. dsc.deletePod(pod)
  432. return
  433. }
  434. // If it has a ControllerRef, that's all that matters.
  435. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
  436. ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
  437. if ds == nil {
  438. return
  439. }
  440. dsKey, err := controller.KeyFunc(ds)
  441. if err != nil {
  442. return
  443. }
  444. klog.V(4).Infof("Pod %s added.", pod.Name)
  445. dsc.expectations.CreationObserved(dsKey)
  446. dsc.enqueueDaemonSet(ds)
  447. return
  448. }
  449. // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
  450. // them to see if anyone wants to adopt it.
  451. // DO NOT observe creation because no controller should be waiting for an
  452. // orphan.
  453. dss := dsc.getDaemonSetsForPod(pod)
  454. if len(dss) == 0 {
  455. return
  456. }
  457. klog.V(4).Infof("Orphan Pod %s added.", pod.Name)
  458. for _, ds := range dss {
  459. dsc.enqueueDaemonSet(ds)
  460. }
  461. }
  462. // When a pod is updated, figure out what sets manage it and wake them
  463. // up. If the labels of the pod have changed we need to awaken both the old
  464. // and new set. old and cur must be *v1.Pod types.
  465. func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
  466. curPod := cur.(*v1.Pod)
  467. oldPod := old.(*v1.Pod)
  468. if curPod.ResourceVersion == oldPod.ResourceVersion {
  469. // Periodic resync will send update events for all known pods.
  470. // Two different versions of the same pod will always have different RVs.
  471. return
  472. }
  473. if curPod.DeletionTimestamp != nil {
  474. // when a pod is deleted gracefully its deletion timestamp is first modified to reflect a grace period,
  475. // and after such time has passed, the kubelet actually deletes it from the store. We receive an update
  476. // for modification of the deletion timestamp and expect an ds to create more replicas asap, not wait
  477. // until the kubelet actually deletes the pod.
  478. dsc.deletePod(curPod)
  479. return
  480. }
  481. curControllerRef := metav1.GetControllerOf(curPod)
  482. oldControllerRef := metav1.GetControllerOf(oldPod)
  483. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  484. if controllerRefChanged && oldControllerRef != nil {
  485. // The ControllerRef was changed. Sync the old controller, if any.
  486. if ds := dsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); ds != nil {
  487. dsc.enqueueDaemonSet(ds)
  488. }
  489. }
  490. // If it has a ControllerRef, that's all that matters.
  491. if curControllerRef != nil {
  492. ds := dsc.resolveControllerRef(curPod.Namespace, curControllerRef)
  493. if ds == nil {
  494. return
  495. }
  496. klog.V(4).Infof("Pod %s updated.", curPod.Name)
  497. dsc.enqueueDaemonSet(ds)
  498. changedToReady := !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod)
  499. // See https://github.com/kubernetes/kubernetes/pull/38076 for more details
  500. if changedToReady && ds.Spec.MinReadySeconds > 0 {
  501. // Add a second to avoid milliseconds skew in AddAfter.
  502. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
  503. dsc.enqueueDaemonSetAfter(ds, (time.Duration(ds.Spec.MinReadySeconds)*time.Second)+time.Second)
  504. }
  505. return
  506. }
  507. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  508. // to see if anyone wants to adopt it now.
  509. dss := dsc.getDaemonSetsForPod(curPod)
  510. if len(dss) == 0 {
  511. return
  512. }
  513. klog.V(4).Infof("Orphan Pod %s updated.", curPod.Name)
  514. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  515. if labelChanged || controllerRefChanged {
  516. for _, ds := range dss {
  517. dsc.enqueueDaemonSet(ds)
  518. }
  519. }
  520. }
  521. // listSuspendedDaemonPods lists the Daemon pods that 'want to run, but should not schedule'
  522. // for the node.
  523. func (dsc *DaemonSetsController) listSuspendedDaemonPods(node string) (dss []string) {
  524. dsc.suspendedDaemonPodsMutex.Lock()
  525. defer dsc.suspendedDaemonPodsMutex.Unlock()
  526. if _, found := dsc.suspendedDaemonPods[node]; !found {
  527. return nil
  528. }
  529. for k := range dsc.suspendedDaemonPods[node] {
  530. dss = append(dss, k)
  531. }
  532. return
  533. }
  534. // requeueSuspendedDaemonPods enqueues all DaemonSets which has pods that 'want to run,
  535. // but should not schedule' for the node; so DaemonSetController will sync up them again.
  536. func (dsc *DaemonSetsController) requeueSuspendedDaemonPods(node string) {
  537. dss := dsc.listSuspendedDaemonPods(node)
  538. for _, dsKey := range dss {
  539. if ns, name, err := cache.SplitMetaNamespaceKey(dsKey); err != nil {
  540. klog.Errorf("Failed to get DaemonSet's namespace and name from %s: %v", dsKey, err)
  541. continue
  542. } else if ds, err := dsc.dsLister.DaemonSets(ns).Get(name); err != nil {
  543. klog.Errorf("Failed to get DaemonSet %s/%s: %v", ns, name, err)
  544. continue
  545. } else {
  546. dsc.enqueueDaemonSetRateLimited(ds)
  547. }
  548. }
  549. }
  550. // addSuspendedDaemonPods adds DaemonSet which has pods that 'want to run,
  551. // but should not schedule' for the node to the suspended queue.
  552. func (dsc *DaemonSetsController) addSuspendedDaemonPods(node, ds string) {
  553. dsc.suspendedDaemonPodsMutex.Lock()
  554. defer dsc.suspendedDaemonPodsMutex.Unlock()
  555. if _, found := dsc.suspendedDaemonPods[node]; !found {
  556. dsc.suspendedDaemonPods[node] = sets.NewString()
  557. }
  558. dsc.suspendedDaemonPods[node].Insert(ds)
  559. }
  560. // removeSuspendedDaemonPods removes DaemonSet which has pods that 'want to run,
  561. // but should not schedule' for the node from suspended queue.
  562. func (dsc *DaemonSetsController) removeSuspendedDaemonPods(node, ds string) {
  563. dsc.suspendedDaemonPodsMutex.Lock()
  564. defer dsc.suspendedDaemonPodsMutex.Unlock()
  565. if _, found := dsc.suspendedDaemonPods[node]; !found {
  566. return
  567. }
  568. dsc.suspendedDaemonPods[node].Delete(ds)
  569. if len(dsc.suspendedDaemonPods[node]) == 0 {
  570. delete(dsc.suspendedDaemonPods, node)
  571. }
  572. }
  573. func (dsc *DaemonSetsController) deletePod(obj interface{}) {
  574. pod, ok := obj.(*v1.Pod)
  575. // When a delete is dropped, the relist will notice a pod in the store not
  576. // in the list, leading to the insertion of a tombstone object which contains
  577. // the deleted key/value. Note that this value might be stale. If the pod
  578. // changed labels the new daemonset will not be woken up till the periodic
  579. // resync.
  580. if !ok {
  581. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  582. if !ok {
  583. utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
  584. return
  585. }
  586. pod, ok = tombstone.Obj.(*v1.Pod)
  587. if !ok {
  588. utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
  589. return
  590. }
  591. }
  592. controllerRef := metav1.GetControllerOf(pod)
  593. if controllerRef == nil {
  594. // No controller should care about orphans being deleted.
  595. if len(pod.Spec.NodeName) != 0 {
  596. // If scheduled pods were deleted, requeue suspended daemon pods.
  597. dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName)
  598. }
  599. return
  600. }
  601. ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
  602. if ds == nil {
  603. if len(pod.Spec.NodeName) != 0 {
  604. // If scheduled pods were deleted, requeue suspended daemon pods.
  605. dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName)
  606. }
  607. return
  608. }
  609. dsKey, err := controller.KeyFunc(ds)
  610. if err != nil {
  611. return
  612. }
  613. klog.V(4).Infof("Pod %s deleted.", pod.Name)
  614. dsc.expectations.DeletionObserved(dsKey)
  615. dsc.enqueueDaemonSet(ds)
  616. }
  617. func (dsc *DaemonSetsController) addNode(obj interface{}) {
  618. // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
  619. dsList, err := dsc.dsLister.List(labels.Everything())
  620. if err != nil {
  621. klog.V(4).Infof("Error enqueueing daemon sets: %v", err)
  622. return
  623. }
  624. node := obj.(*v1.Node)
  625. for _, ds := range dsList {
  626. _, shouldSchedule, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
  627. if err != nil {
  628. continue
  629. }
  630. if shouldSchedule {
  631. dsc.enqueueDaemonSet(ds)
  632. }
  633. }
  634. }
  635. // nodeInSameCondition returns true if all effective types ("Status" is true) equals;
  636. // otherwise, returns false.
  637. func nodeInSameCondition(old []v1.NodeCondition, cur []v1.NodeCondition) bool {
  638. if len(old) == 0 && len(cur) == 0 {
  639. return true
  640. }
  641. c1map := map[v1.NodeConditionType]v1.ConditionStatus{}
  642. for _, c := range old {
  643. if c.Status == v1.ConditionTrue {
  644. c1map[c.Type] = c.Status
  645. }
  646. }
  647. for _, c := range cur {
  648. if c.Status != v1.ConditionTrue {
  649. continue
  650. }
  651. if _, found := c1map[c.Type]; !found {
  652. return false
  653. }
  654. delete(c1map, c.Type)
  655. }
  656. return len(c1map) == 0
  657. }
  658. func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
  659. if !nodeInSameCondition(oldNode.Status.Conditions, curNode.Status.Conditions) {
  660. return false
  661. }
  662. oldNode.ResourceVersion = curNode.ResourceVersion
  663. oldNode.Status.Conditions = curNode.Status.Conditions
  664. return apiequality.Semantic.DeepEqual(oldNode, curNode)
  665. }
  666. func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
  667. oldNode := old.(*v1.Node)
  668. curNode := cur.(*v1.Node)
  669. if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
  670. return
  671. }
  672. dsList, err := dsc.dsLister.List(labels.Everything())
  673. if err != nil {
  674. klog.V(4).Infof("Error listing daemon sets: %v", err)
  675. return
  676. }
  677. // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
  678. for _, ds := range dsList {
  679. _, oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
  680. if err != nil {
  681. continue
  682. }
  683. _, currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
  684. if err != nil {
  685. continue
  686. }
  687. if (oldShouldSchedule != currentShouldSchedule) || (oldShouldContinueRunning != currentShouldContinueRunning) {
  688. dsc.enqueueDaemonSet(ds)
  689. }
  690. }
  691. }
  692. // getDaemonPods returns daemon pods owned by the given ds.
  693. // This also reconciles ControllerRef by adopting/orphaning.
  694. // Note that returned Pods are pointers to objects in the cache.
  695. // If you want to modify one, you need to deep-copy it first.
  696. func (dsc *DaemonSetsController) getDaemonPods(ds *apps.DaemonSet) ([]*v1.Pod, error) {
  697. selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
  698. if err != nil {
  699. return nil, err
  700. }
  701. // List all pods to include those that don't match the selector anymore but
  702. // have a ControllerRef pointing to this controller.
  703. pods, err := dsc.podLister.Pods(ds.Namespace).List(labels.Everything())
  704. if err != nil {
  705. return nil, err
  706. }
  707. // If any adoptions are attempted, we should first recheck for deletion with
  708. // an uncached quorum read sometime after listing Pods (see #42639).
  709. dsNotDeleted := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  710. fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
  711. if err != nil {
  712. return nil, err
  713. }
  714. if fresh.UID != ds.UID {
  715. return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
  716. }
  717. return fresh, nil
  718. })
  719. // Use ControllerRefManager to adopt/orphan as needed.
  720. cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, dsNotDeleted)
  721. return cm.ClaimPods(pods)
  722. }
  723. // getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) created for the nodes.
  724. // This also reconciles ControllerRef by adopting/orphaning.
  725. // Note that returned Pods are pointers to objects in the cache.
  726. // If you want to modify one, you need to deep-copy it first.
  727. func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[string][]*v1.Pod, error) {
  728. claimedPods, err := dsc.getDaemonPods(ds)
  729. if err != nil {
  730. return nil, err
  731. }
  732. // Group Pods by Node name.
  733. nodeToDaemonPods := make(map[string][]*v1.Pod)
  734. for _, pod := range claimedPods {
  735. nodeName, err := util.GetTargetNodeName(pod)
  736. if err != nil {
  737. klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
  738. pod.Namespace, pod.Name, ds.Namespace, ds.Name)
  739. continue
  740. }
  741. nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
  742. }
  743. return nodeToDaemonPods, nil
  744. }
  745. // resolveControllerRef returns the controller referenced by a ControllerRef,
  746. // or nil if the ControllerRef could not be resolved to a matching controller
  747. // of the correct Kind.
  748. func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.DaemonSet {
  749. // We can't look up by UID, so look up by Name and then verify UID.
  750. // Don't even try to look up by Name if it's the wrong Kind.
  751. if controllerRef.Kind != controllerKind.Kind {
  752. return nil
  753. }
  754. ds, err := dsc.dsLister.DaemonSets(namespace).Get(controllerRef.Name)
  755. if err != nil {
  756. return nil
  757. }
  758. if ds.UID != controllerRef.UID {
  759. // The controller we found with this Name is not the same one that the
  760. // ControllerRef points to.
  761. return nil
  762. }
  763. return ds
  764. }
  765. // podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node:
  766. // - nodesNeedingDaemonPods: the pods need to start on the node
  767. // - podsToDelete: the Pods need to be deleted on the node
  768. // - failedPodsObserved: the number of failed pods on node
  769. // - err: unexpected error
  770. func (dsc *DaemonSetsController) podsShouldBeOnNode(
  771. node *v1.Node,
  772. nodeToDaemonPods map[string][]*v1.Pod,
  773. ds *apps.DaemonSet,
  774. ) (nodesNeedingDaemonPods, podsToDelete []string, failedPodsObserved int, err error) {
  775. wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
  776. if err != nil {
  777. return
  778. }
  779. daemonPods, exists := nodeToDaemonPods[node.Name]
  780. dsKey, _ := cache.MetaNamespaceKeyFunc(ds)
  781. dsc.removeSuspendedDaemonPods(node.Name, dsKey)
  782. switch {
  783. case wantToRun && !shouldSchedule:
  784. // If daemon pod is supposed to run, but can not be scheduled, add to suspended list.
  785. dsc.addSuspendedDaemonPods(node.Name, dsKey)
  786. case shouldSchedule && !exists:
  787. // If daemon pod is supposed to be running on node, but isn't, create daemon pod.
  788. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
  789. case shouldContinueRunning:
  790. // If a daemon pod failed, delete it
  791. // If there's non-daemon pods left on this node, we will create it in the next sync loop
  792. var daemonPodsRunning []*v1.Pod
  793. for _, pod := range daemonPods {
  794. if pod.DeletionTimestamp != nil {
  795. continue
  796. }
  797. if pod.Status.Phase == v1.PodFailed {
  798. failedPodsObserved++
  799. // This is a critical place where DS is often fighting with kubelet that rejects pods.
  800. // We need to avoid hot looping and backoff.
  801. backoffKey := failedPodsBackoffKey(ds, node.Name)
  802. now := dsc.failedPodsBackoff.Clock.Now()
  803. inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
  804. if inBackoff {
  805. delay := dsc.failedPodsBackoff.Get(backoffKey)
  806. klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
  807. pod.Namespace, pod.Name, node.Name, delay)
  808. dsc.enqueueDaemonSetAfter(ds, delay)
  809. continue
  810. }
  811. dsc.failedPodsBackoff.Next(backoffKey, now)
  812. msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
  813. klog.V(2).Infof(msg)
  814. // Emit an event so that it's discoverable to users.
  815. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
  816. podsToDelete = append(podsToDelete, pod.Name)
  817. } else {
  818. daemonPodsRunning = append(daemonPodsRunning, pod)
  819. }
  820. }
  821. // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
  822. // Sort the daemon pods by creation time, so the oldest is preserved.
  823. if len(daemonPodsRunning) > 1 {
  824. sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
  825. for i := 1; i < len(daemonPodsRunning); i++ {
  826. podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
  827. }
  828. }
  829. case !shouldContinueRunning && exists:
  830. // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
  831. for _, pod := range daemonPods {
  832. podsToDelete = append(podsToDelete, pod.Name)
  833. }
  834. }
  835. return nodesNeedingDaemonPods, podsToDelete, failedPodsObserved, nil
  836. }
  837. // manage manages the scheduling and running of Pods of ds on nodes.
  838. // After figuring out which nodes should run a Pod of ds but not yet running one and
  839. // which nodes should not run a Pod of ds but currently running one, it calls function
  840. // syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
  841. func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
  842. // Find out the pods which are created for the nodes by DaemonSet.
  843. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
  844. if err != nil {
  845. return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
  846. }
  847. // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
  848. // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
  849. var nodesNeedingDaemonPods, podsToDelete []string
  850. var failedPodsObserved int
  851. for _, node := range nodeList {
  852. nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, failedPodsObservedOnNode, err := dsc.podsShouldBeOnNode(
  853. node, nodeToDaemonPods, ds)
  854. if err != nil {
  855. continue
  856. }
  857. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
  858. podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
  859. failedPodsObserved += failedPodsObservedOnNode
  860. }
  861. // Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
  862. // If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
  863. if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
  864. podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
  865. }
  866. // Label new pods using the hash label value of the current history when creating them
  867. if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
  868. return err
  869. }
  870. // Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop
  871. if failedPodsObserved > 0 {
  872. return fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name)
  873. }
  874. return nil
  875. }
  876. // syncNodes deletes given pods and creates new daemon set pods on the given nodes
  877. // returns slice with erros if any
  878. func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
  879. // We need to set expectations before creating/deleting pods to avoid race conditions.
  880. dsKey, err := controller.KeyFunc(ds)
  881. if err != nil {
  882. return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
  883. }
  884. createDiff := len(nodesNeedingDaemonPods)
  885. deleteDiff := len(podsToDelete)
  886. if createDiff > dsc.burstReplicas {
  887. createDiff = dsc.burstReplicas
  888. }
  889. if deleteDiff > dsc.burstReplicas {
  890. deleteDiff = dsc.burstReplicas
  891. }
  892. dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
  893. // error channel to communicate back failures. make the buffer big enough to avoid any blocking
  894. errCh := make(chan error, createDiff+deleteDiff)
  895. klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
  896. createWait := sync.WaitGroup{}
  897. // If the returned error is not nil we have a parse error.
  898. // The controller handles this via the hash.
  899. generation, err := util.GetTemplateGeneration(ds)
  900. if err != nil {
  901. generation = nil
  902. }
  903. template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
  904. // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
  905. // and double with each successful iteration in a kind of "slow start".
  906. // This handles attempts to start large numbers of pods that would
  907. // likely all fail with the same error. For example a project with a
  908. // low quota that attempts to create a large number of pods will be
  909. // prevented from spamming the API service with the pod create requests
  910. // after one of its pods fails. Conveniently, this also prevents the
  911. // event spam that those failures would generate.
  912. batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
  913. for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
  914. errorCount := len(errCh)
  915. createWait.Add(batchSize)
  916. for i := pos; i < pos+batchSize; i++ {
  917. go func(ix int) {
  918. defer createWait.Done()
  919. var err error
  920. podTemplate := template.DeepCopy()
  921. if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
  922. // The pod's NodeAffinity will be updated to make sure the Pod is bound
  923. // to the target node by default scheduler. It is safe to do so because there
  924. // should be no conflicting node affinity with the target node.
  925. podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
  926. podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
  927. err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
  928. ds, metav1.NewControllerRef(ds, controllerKind))
  929. } else {
  930. // If pod is scheduled by DaemonSetController, set its '.spec.scheduleName'.
  931. podTemplate.Spec.SchedulerName = "kubernetes.io/daemonset-controller"
  932. err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate,
  933. ds, metav1.NewControllerRef(ds, controllerKind))
  934. }
  935. if err != nil && errors.IsTimeout(err) {
  936. // Pod is created but its initialization has timed out.
  937. // If the initialization is successful eventually, the
  938. // controller will observe the creation via the informer.
  939. // If the initialization fails, or if the pod keeps
  940. // uninitialized for a long time, the informer will not
  941. // receive any update, and the controller will create a new
  942. // pod when the expectation expires.
  943. return
  944. }
  945. if err != nil {
  946. klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
  947. dsc.expectations.CreationObserved(dsKey)
  948. errCh <- err
  949. utilruntime.HandleError(err)
  950. }
  951. }(i)
  952. }
  953. createWait.Wait()
  954. // any skipped pods that we never attempted to start shouldn't be expected.
  955. skippedPods := createDiff - (batchSize + pos)
  956. if errorCount < len(errCh) && skippedPods > 0 {
  957. klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
  958. for i := 0; i < skippedPods; i++ {
  959. dsc.expectations.CreationObserved(dsKey)
  960. }
  961. // The skipped pods will be retried later. The next controller resync will
  962. // retry the slow start process.
  963. break
  964. }
  965. }
  966. klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
  967. deleteWait := sync.WaitGroup{}
  968. deleteWait.Add(deleteDiff)
  969. for i := 0; i < deleteDiff; i++ {
  970. go func(ix int) {
  971. defer deleteWait.Done()
  972. if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
  973. klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
  974. dsc.expectations.DeletionObserved(dsKey)
  975. errCh <- err
  976. utilruntime.HandleError(err)
  977. }
  978. }(i)
  979. }
  980. deleteWait.Wait()
  981. // collect errors if any for proper reporting/retry logic in the controller
  982. errors := []error{}
  983. close(errCh)
  984. for err := range errCh {
  985. errors = append(errors, err)
  986. }
  987. return utilerrors.NewAggregate(errors)
  988. }
  989. func storeDaemonSetStatus(dsClient unversionedapps.DaemonSetInterface, ds *apps.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int, updateObservedGen bool) error {
  990. if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
  991. int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
  992. int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
  993. int(ds.Status.NumberReady) == numberReady &&
  994. int(ds.Status.UpdatedNumberScheduled) == updatedNumberScheduled &&
  995. int(ds.Status.NumberAvailable) == numberAvailable &&
  996. int(ds.Status.NumberUnavailable) == numberUnavailable &&
  997. ds.Status.ObservedGeneration >= ds.Generation {
  998. return nil
  999. }
  1000. toUpdate := ds.DeepCopy()
  1001. var updateErr, getErr error
  1002. for i := 0; i < StatusUpdateRetries; i++ {
  1003. if updateObservedGen {
  1004. toUpdate.Status.ObservedGeneration = ds.Generation
  1005. }
  1006. toUpdate.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
  1007. toUpdate.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
  1008. toUpdate.Status.NumberMisscheduled = int32(numberMisscheduled)
  1009. toUpdate.Status.NumberReady = int32(numberReady)
  1010. toUpdate.Status.UpdatedNumberScheduled = int32(updatedNumberScheduled)
  1011. toUpdate.Status.NumberAvailable = int32(numberAvailable)
  1012. toUpdate.Status.NumberUnavailable = int32(numberUnavailable)
  1013. if _, updateErr = dsClient.UpdateStatus(toUpdate); updateErr == nil {
  1014. return nil
  1015. }
  1016. // Update the set with the latest resource version for the next poll
  1017. if toUpdate, getErr = dsClient.Get(ds.Name, metav1.GetOptions{}); getErr != nil {
  1018. // If the GET fails we can't trust status.Replicas anymore. This error
  1019. // is bound to be more interesting than the update failure.
  1020. return getErr
  1021. }
  1022. }
  1023. return updateErr
  1024. }
  1025. func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
  1026. klog.V(4).Infof("Updating daemon set status")
  1027. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
  1028. if err != nil {
  1029. return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
  1030. }
  1031. var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
  1032. for _, node := range nodeList {
  1033. wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
  1034. if err != nil {
  1035. return err
  1036. }
  1037. scheduled := len(nodeToDaemonPods[node.Name]) > 0
  1038. if wantToRun {
  1039. desiredNumberScheduled++
  1040. if scheduled {
  1041. currentNumberScheduled++
  1042. // Sort the daemon pods by creation time, so that the oldest is first.
  1043. daemonPods, _ := nodeToDaemonPods[node.Name]
  1044. sort.Sort(podByCreationTimestampAndPhase(daemonPods))
  1045. pod := daemonPods[0]
  1046. if podutil.IsPodReady(pod) {
  1047. numberReady++
  1048. if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
  1049. numberAvailable++
  1050. }
  1051. }
  1052. // If the returned error is not nil we have a parse error.
  1053. // The controller handles this via the hash.
  1054. generation, err := util.GetTemplateGeneration(ds)
  1055. if err != nil {
  1056. generation = nil
  1057. }
  1058. if util.IsPodUpdated(pod, hash, generation) {
  1059. updatedNumberScheduled++
  1060. }
  1061. }
  1062. } else {
  1063. if scheduled {
  1064. numberMisscheduled++
  1065. }
  1066. }
  1067. }
  1068. numberUnavailable := desiredNumberScheduled - numberAvailable
  1069. err = storeDaemonSetStatus(dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
  1070. if err != nil {
  1071. return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
  1072. }
  1073. // Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew.
  1074. if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
  1075. dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
  1076. }
  1077. return nil
  1078. }
  1079. func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
  1080. startTime := time.Now()
  1081. defer func() {
  1082. klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
  1083. }()
  1084. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  1085. if err != nil {
  1086. return err
  1087. }
  1088. ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
  1089. if errors.IsNotFound(err) {
  1090. klog.V(3).Infof("daemon set has been deleted %v", key)
  1091. dsc.expectations.DeleteExpectations(key)
  1092. return nil
  1093. }
  1094. if err != nil {
  1095. return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
  1096. }
  1097. nodeList, err := dsc.nodeLister.List(labels.Everything())
  1098. if err != nil {
  1099. return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
  1100. }
  1101. everything := metav1.LabelSelector{}
  1102. if reflect.DeepEqual(ds.Spec.Selector, &everything) {
  1103. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
  1104. return nil
  1105. }
  1106. // Don't process a daemon set until all its creations and deletions have been processed.
  1107. // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
  1108. // then we do not want to call manage on foo until the daemon pods have been created.
  1109. dsKey, err := controller.KeyFunc(ds)
  1110. if err != nil {
  1111. return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
  1112. }
  1113. // If the DaemonSet is being deleted (either by foreground deletion or
  1114. // orphan deletion), we cannot be sure if the DaemonSet history objects
  1115. // it owned still exist -- those history objects can either be deleted
  1116. // or orphaned. Garbage collector doesn't guarantee that it will delete
  1117. // DaemonSet pods before deleting DaemonSet history objects, because
  1118. // DaemonSet history doesn't own DaemonSet pods. We cannot reliably
  1119. // calculate the status of a DaemonSet being deleted. Therefore, return
  1120. // here without updating status for the DaemonSet being deleted.
  1121. if ds.DeletionTimestamp != nil {
  1122. return nil
  1123. }
  1124. // Construct histories of the DaemonSet, and get the hash of current history
  1125. cur, old, err := dsc.constructHistory(ds)
  1126. if err != nil {
  1127. return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
  1128. }
  1129. hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
  1130. if !dsc.expectations.SatisfiedExpectations(dsKey) {
  1131. // Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
  1132. return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
  1133. }
  1134. err = dsc.manage(ds, nodeList, hash)
  1135. if err != nil {
  1136. return err
  1137. }
  1138. // Process rolling updates if we're ready.
  1139. if dsc.expectations.SatisfiedExpectations(dsKey) {
  1140. switch ds.Spec.UpdateStrategy.Type {
  1141. case apps.OnDeleteDaemonSetStrategyType:
  1142. case apps.RollingUpdateDaemonSetStrategyType:
  1143. err = dsc.rollingUpdate(ds, nodeList, hash)
  1144. }
  1145. if err != nil {
  1146. return err
  1147. }
  1148. }
  1149. err = dsc.cleanupHistory(ds, old)
  1150. if err != nil {
  1151. return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
  1152. }
  1153. return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
  1154. }
  1155. func (dsc *DaemonSetsController) simulate(newPod *v1.Pod, node *v1.Node, ds *apps.DaemonSet) ([]predicates.PredicateFailureReason, *schedulernodeinfo.NodeInfo, error) {
  1156. objects, err := dsc.podNodeIndex.ByIndex("nodeName", node.Name)
  1157. if err != nil {
  1158. return nil, nil, err
  1159. }
  1160. nodeInfo := schedulernodeinfo.NewNodeInfo()
  1161. nodeInfo.SetNode(node)
  1162. for _, obj := range objects {
  1163. // Ignore pods that belong to the daemonset when taking into account whether a daemonset should bind to a node.
  1164. pod, ok := obj.(*v1.Pod)
  1165. if !ok {
  1166. continue
  1167. }
  1168. if metav1.IsControlledBy(pod, ds) {
  1169. continue
  1170. }
  1171. nodeInfo.AddPod(pod)
  1172. }
  1173. _, reasons, err := Predicates(newPod, nodeInfo)
  1174. return reasons, nodeInfo, err
  1175. }
  1176. // nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
  1177. // summary. Returned booleans are:
  1178. // * wantToRun:
  1179. // Returns true when a user would expect a pod to run on this node and ignores conditions
  1180. // such as DiskPressure or insufficient resource that would cause a daemonset pod not to schedule.
  1181. // This is primarily used to populate daemonset status.
  1182. // * shouldSchedule:
  1183. // Returns true when a daemonset should be scheduled to a node if a daemonset pod is not already
  1184. // running on that node.
  1185. // * shouldContinueRunning:
  1186. // Returns true when a daemonset should continue running on a node if a daemonset pod is already
  1187. // running on that node.
  1188. func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
  1189. newPod := NewPod(ds, node.Name)
  1190. // Because these bools require an && of all their required conditions, we start
  1191. // with all bools set to true and set a bool to false if a condition is not met.
  1192. // A bool should probably not be set to true after this line.
  1193. wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
  1194. // If the daemon set specifies a node name, check that it matches with node.Name.
  1195. if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
  1196. return false, false, false, nil
  1197. }
  1198. reasons, nodeInfo, err := dsc.simulate(newPod, node, ds)
  1199. if err != nil {
  1200. klog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err)
  1201. return false, false, false, err
  1202. }
  1203. // TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason,
  1204. // e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning"
  1205. // into one result, e.g. selectedNode.
  1206. var insufficientResourceErr error
  1207. for _, r := range reasons {
  1208. klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
  1209. switch reason := r.(type) {
  1210. case *predicates.InsufficientResourceError:
  1211. insufficientResourceErr = reason
  1212. case *predicates.PredicateFailureError:
  1213. var emitEvent bool
  1214. // we try to partition predicates into two partitions here: intentional on the part of the operator and not.
  1215. switch reason {
  1216. // intentional
  1217. case
  1218. predicates.ErrNodeSelectorNotMatch,
  1219. predicates.ErrPodNotMatchHostName,
  1220. predicates.ErrNodeLabelPresenceViolated,
  1221. // this one is probably intentional since it's a workaround for not having
  1222. // pod hard anti affinity.
  1223. predicates.ErrPodNotFitsHostPorts:
  1224. return false, false, false, nil
  1225. case predicates.ErrTaintsTolerationsNotMatch:
  1226. // DaemonSet is expected to respect taints and tolerations
  1227. fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
  1228. if err != nil {
  1229. return false, false, false, err
  1230. }
  1231. if !fitsNoExecute {
  1232. return false, false, false, nil
  1233. }
  1234. wantToRun, shouldSchedule = false, false
  1235. // unintentional
  1236. case
  1237. predicates.ErrDiskConflict,
  1238. predicates.ErrVolumeZoneConflict,
  1239. predicates.ErrMaxVolumeCountExceeded,
  1240. predicates.ErrNodeUnderMemoryPressure,
  1241. predicates.ErrNodeUnderDiskPressure:
  1242. // wantToRun and shouldContinueRunning are likely true here. They are
  1243. // absolutely true at the time of writing the comment. See first comment
  1244. // of this method.
  1245. shouldSchedule = false
  1246. emitEvent = true
  1247. // unexpected
  1248. case
  1249. predicates.ErrPodAffinityNotMatch,
  1250. predicates.ErrServiceAffinityViolated:
  1251. klog.Warningf("unexpected predicate failure reason: %s", reason.GetReason())
  1252. return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
  1253. default:
  1254. klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason())
  1255. wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
  1256. emitEvent = true
  1257. }
  1258. if emitEvent {
  1259. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
  1260. }
  1261. }
  1262. }
  1263. // only emit this event if insufficient resource is the only thing
  1264. // preventing the daemon pod from scheduling
  1265. if shouldSchedule && insufficientResourceErr != nil {
  1266. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error())
  1267. shouldSchedule = false
  1268. }
  1269. return
  1270. }
  1271. // NewPod creates a new pod
  1272. func NewPod(ds *apps.DaemonSet, nodeName string) *v1.Pod {
  1273. newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
  1274. newPod.Namespace = ds.Namespace
  1275. newPod.Spec.NodeName = nodeName
  1276. // Added default tolerations for DaemonSet pods.
  1277. util.AddOrUpdateDaemonPodTolerations(&newPod.Spec)
  1278. return newPod
  1279. }
  1280. // checkNodeFitness runs a set of predicates that select candidate nodes for the DaemonSet;
  1281. // the predicates include:
  1282. // - PodFitsHost: checks pod's NodeName against node
  1283. // - PodMatchNodeSelector: checks pod's NodeSelector and NodeAffinity against node
  1284. // - PodToleratesNodeTaints: exclude tainted node unless pod has specific toleration
  1285. func checkNodeFitness(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
  1286. var predicateFails []predicates.PredicateFailureReason
  1287. fit, reasons, err := predicates.PodFitsHost(pod, meta, nodeInfo)
  1288. if err != nil {
  1289. return false, predicateFails, err
  1290. }
  1291. if !fit {
  1292. predicateFails = append(predicateFails, reasons...)
  1293. }
  1294. fit, reasons, err = predicates.PodMatchNodeSelector(pod, meta, nodeInfo)
  1295. if err != nil {
  1296. return false, predicateFails, err
  1297. }
  1298. if !fit {
  1299. predicateFails = append(predicateFails, reasons...)
  1300. }
  1301. fit, reasons, err = predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
  1302. if err != nil {
  1303. return false, predicateFails, err
  1304. }
  1305. if !fit {
  1306. predicateFails = append(predicateFails, reasons...)
  1307. }
  1308. return len(predicateFails) == 0, predicateFails, nil
  1309. }
  1310. // Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates
  1311. // and PodToleratesNodeTaints predicate
  1312. func Predicates(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
  1313. var predicateFails []predicates.PredicateFailureReason
  1314. // If ScheduleDaemonSetPods is enabled, only check nodeSelector, nodeAffinity and toleration/taint match.
  1315. if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
  1316. fit, reasons, err := checkNodeFitness(pod, nil, nodeInfo)
  1317. if err != nil {
  1318. return false, predicateFails, err
  1319. }
  1320. if !fit {
  1321. predicateFails = append(predicateFails, reasons...)
  1322. }
  1323. return len(predicateFails) == 0, predicateFails, nil
  1324. }
  1325. critical := kubelettypes.IsCriticalPod(pod)
  1326. fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
  1327. if err != nil {
  1328. return false, predicateFails, err
  1329. }
  1330. if !fit {
  1331. predicateFails = append(predicateFails, reasons...)
  1332. }
  1333. if critical {
  1334. // If the pod is marked as critical and support for critical pod annotations is enabled,
  1335. // check predicates for critical pods only.
  1336. fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo)
  1337. } else {
  1338. fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo)
  1339. }
  1340. if err != nil {
  1341. return false, predicateFails, err
  1342. }
  1343. if !fit {
  1344. predicateFails = append(predicateFails, reasons...)
  1345. }
  1346. return len(predicateFails) == 0, predicateFails, nil
  1347. }
  1348. type podByCreationTimestampAndPhase []*v1.Pod
  1349. func (o podByCreationTimestampAndPhase) Len() int { return len(o) }
  1350. func (o podByCreationTimestampAndPhase) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
  1351. func (o podByCreationTimestampAndPhase) Less(i, j int) bool {
  1352. // Scheduled Pod first
  1353. if len(o[i].Spec.NodeName) != 0 && len(o[j].Spec.NodeName) == 0 {
  1354. return true
  1355. }
  1356. if len(o[i].Spec.NodeName) == 0 && len(o[j].Spec.NodeName) != 0 {
  1357. return false
  1358. }
  1359. if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
  1360. return o[i].Name < o[j].Name
  1361. }
  1362. return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
  1363. }
  1364. func failedPodsBackoffKey(ds *apps.DaemonSet, nodeName string) string {
  1365. return fmt.Sprintf("%s/%d/%s", ds.UID, ds.Status.ObservedGeneration, nodeName)
  1366. }
  1367. // getUnscheduledPodsWithoutNode returns list of unscheduled pods assigned to not existing nodes.
  1368. // Returned pods can't be deleted by PodGCController so they should be deleted by DaemonSetController.
  1369. func getUnscheduledPodsWithoutNode(runningNodesList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) []string {
  1370. var results []string
  1371. isNodeRunning := make(map[string]bool)
  1372. for _, node := range runningNodesList {
  1373. isNodeRunning[node.Name] = true
  1374. }
  1375. for n, pods := range nodeToDaemonPods {
  1376. if !isNodeRunning[n] {
  1377. for _, pod := range pods {
  1378. if len(pod.Spec.NodeName) == 0 {
  1379. results = append(results, pod.Name)
  1380. }
  1381. }
  1382. }
  1383. }
  1384. return results
  1385. }