daemon_controller.go 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package daemon
  14. import (
  15. "context"
  16. "fmt"
  17. "reflect"
  18. "sort"
  19. "sync"
  20. "time"
  21. "k8s.io/klog"
  22. apps "k8s.io/api/apps/v1"
  23. v1 "k8s.io/api/core/v1"
  24. apiequality "k8s.io/apimachinery/pkg/api/equality"
  25. "k8s.io/apimachinery/pkg/api/errors"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/labels"
  28. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  29. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. appsinformers "k8s.io/client-go/informers/apps/v1"
  32. coreinformers "k8s.io/client-go/informers/core/v1"
  33. clientset "k8s.io/client-go/kubernetes"
  34. "k8s.io/client-go/kubernetes/scheme"
  35. unversionedapps "k8s.io/client-go/kubernetes/typed/apps/v1"
  36. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  37. appslisters "k8s.io/client-go/listers/apps/v1"
  38. corelisters "k8s.io/client-go/listers/core/v1"
  39. "k8s.io/client-go/tools/cache"
  40. "k8s.io/client-go/tools/record"
  41. "k8s.io/client-go/util/flowcontrol"
  42. "k8s.io/client-go/util/workqueue"
  43. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  44. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  45. v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  46. "k8s.io/kubernetes/pkg/controller"
  47. "k8s.io/kubernetes/pkg/controller/daemon/util"
  48. pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
  49. schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
  50. "k8s.io/utils/integer"
  51. )
  52. const (
  53. // BurstReplicas is a rate limiter for booting pods on a lot of pods.
  54. // The value of 250 is chosen b/c values that are too high can cause registry DoS issues.
  55. BurstReplicas = 250
  56. // StatusUpdateRetries limits the number of retries if sending a status update to API server fails.
  57. StatusUpdateRetries = 1
  58. // BackoffGCInterval is the time that has to pass before next iteration of backoff GC is run
  59. BackoffGCInterval = 1 * time.Minute
  60. )
  61. // Reasons for DaemonSet events
  62. const (
  63. // SelectingAllReason is added to an event when a DaemonSet selects all Pods.
  64. SelectingAllReason = "SelectingAll"
  65. // FailedPlacementReason is added to an event when a DaemonSet can't schedule a Pod to a specified node.
  66. FailedPlacementReason = "FailedPlacement"
  67. // FailedDaemonPodReason is added to an event when the status of a Pod of a DaemonSet is 'Failed'.
  68. FailedDaemonPodReason = "FailedDaemonPod"
  69. )
  70. // controllerKind contains the schema.GroupVersionKind for this controller type.
  71. var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet")
  72. // DaemonSetsController is responsible for synchronizing DaemonSet objects stored
  73. // in the system with actual running pods.
  74. type DaemonSetsController struct {
  75. kubeClient clientset.Interface
  76. eventRecorder record.EventRecorder
  77. podControl controller.PodControlInterface
  78. crControl controller.ControllerRevisionControlInterface
  79. // An dsc is temporarily suspended after creating/deleting these many replicas.
  80. // It resumes normal action after observing the watch events for them.
  81. burstReplicas int
  82. // To allow injection of syncDaemonSet for testing.
  83. syncHandler func(dsKey string) error
  84. // used for unit testing
  85. enqueueDaemonSet func(ds *apps.DaemonSet)
  86. // A TTLCache of pod creates/deletes each ds expects to see
  87. expectations controller.ControllerExpectationsInterface
  88. // dsLister can list/get daemonsets from the shared informer's store
  89. dsLister appslisters.DaemonSetLister
  90. // dsStoreSynced returns true if the daemonset store has been synced at least once.
  91. // Added as a member to the struct to allow injection for testing.
  92. dsStoreSynced cache.InformerSynced
  93. // historyLister get list/get history from the shared informers's store
  94. historyLister appslisters.ControllerRevisionLister
  95. // historyStoreSynced returns true if the history store has been synced at least once.
  96. // Added as a member to the struct to allow injection for testing.
  97. historyStoreSynced cache.InformerSynced
  98. // podLister get list/get pods from the shared informers's store
  99. podLister corelisters.PodLister
  100. // podNodeIndex indexes pods by their nodeName
  101. podNodeIndex cache.Indexer
  102. // podStoreSynced returns true if the pod store has been synced at least once.
  103. // Added as a member to the struct to allow injection for testing.
  104. podStoreSynced cache.InformerSynced
  105. // nodeLister can list/get nodes from the shared informer's store
  106. nodeLister corelisters.NodeLister
  107. // nodeStoreSynced returns true if the node store has been synced at least once.
  108. // Added as a member to the struct to allow injection for testing.
  109. nodeStoreSynced cache.InformerSynced
  110. // DaemonSet keys that need to be synced.
  111. queue workqueue.RateLimitingInterface
  112. failedPodsBackoff *flowcontrol.Backoff
  113. }
  114. // NewDaemonSetsController creates a new DaemonSetsController
  115. func NewDaemonSetsController(
  116. daemonSetInformer appsinformers.DaemonSetInformer,
  117. historyInformer appsinformers.ControllerRevisionInformer,
  118. podInformer coreinformers.PodInformer,
  119. nodeInformer coreinformers.NodeInformer,
  120. kubeClient clientset.Interface,
  121. failedPodsBackoff *flowcontrol.Backoff,
  122. ) (*DaemonSetsController, error) {
  123. eventBroadcaster := record.NewBroadcaster()
  124. eventBroadcaster.StartLogging(klog.Infof)
  125. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  126. if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  127. if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  128. return nil, err
  129. }
  130. }
  131. dsc := &DaemonSetsController{
  132. kubeClient: kubeClient,
  133. eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
  134. podControl: controller.RealPodControl{
  135. KubeClient: kubeClient,
  136. Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
  137. },
  138. crControl: controller.RealControllerRevisionControl{
  139. KubeClient: kubeClient,
  140. },
  141. burstReplicas: BurstReplicas,
  142. expectations: controller.NewControllerExpectations(),
  143. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
  144. }
  145. daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  146. AddFunc: func(obj interface{}) {
  147. ds := obj.(*apps.DaemonSet)
  148. klog.V(4).Infof("Adding daemon set %s", ds.Name)
  149. dsc.enqueueDaemonSet(ds)
  150. },
  151. UpdateFunc: func(old, cur interface{}) {
  152. oldDS := old.(*apps.DaemonSet)
  153. curDS := cur.(*apps.DaemonSet)
  154. klog.V(4).Infof("Updating daemon set %s", oldDS.Name)
  155. dsc.enqueueDaemonSet(curDS)
  156. },
  157. DeleteFunc: dsc.deleteDaemonset,
  158. })
  159. dsc.dsLister = daemonSetInformer.Lister()
  160. dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
  161. historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  162. AddFunc: dsc.addHistory,
  163. UpdateFunc: dsc.updateHistory,
  164. DeleteFunc: dsc.deleteHistory,
  165. })
  166. dsc.historyLister = historyInformer.Lister()
  167. dsc.historyStoreSynced = historyInformer.Informer().HasSynced
  168. // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
  169. // more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
  170. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  171. AddFunc: dsc.addPod,
  172. UpdateFunc: dsc.updatePod,
  173. DeleteFunc: dsc.deletePod,
  174. })
  175. dsc.podLister = podInformer.Lister()
  176. // This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
  177. podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
  178. "nodeName": indexByPodNodeName,
  179. })
  180. dsc.podNodeIndex = podInformer.Informer().GetIndexer()
  181. dsc.podStoreSynced = podInformer.Informer().HasSynced
  182. nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  183. AddFunc: dsc.addNode,
  184. UpdateFunc: dsc.updateNode,
  185. },
  186. )
  187. dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
  188. dsc.nodeLister = nodeInformer.Lister()
  189. dsc.syncHandler = dsc.syncDaemonSet
  190. dsc.enqueueDaemonSet = dsc.enqueue
  191. dsc.failedPodsBackoff = failedPodsBackoff
  192. return dsc, nil
  193. }
  194. func indexByPodNodeName(obj interface{}) ([]string, error) {
  195. pod, ok := obj.(*v1.Pod)
  196. if !ok {
  197. return []string{}, nil
  198. }
  199. // We are only interested in active pods with nodeName set
  200. if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
  201. return []string{}, nil
  202. }
  203. return []string{pod.Spec.NodeName}, nil
  204. }
  205. func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
  206. ds, ok := obj.(*apps.DaemonSet)
  207. if !ok {
  208. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  209. if !ok {
  210. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  211. return
  212. }
  213. ds, ok = tombstone.Obj.(*apps.DaemonSet)
  214. if !ok {
  215. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj))
  216. return
  217. }
  218. }
  219. klog.V(4).Infof("Deleting daemon set %s", ds.Name)
  220. dsc.enqueueDaemonSet(ds)
  221. }
  222. // Run begins watching and syncing daemon sets.
  223. func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
  224. defer utilruntime.HandleCrash()
  225. defer dsc.queue.ShutDown()
  226. klog.Infof("Starting daemon sets controller")
  227. defer klog.Infof("Shutting down daemon sets controller")
  228. if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
  229. return
  230. }
  231. for i := 0; i < workers; i++ {
  232. go wait.Until(dsc.runWorker, time.Second, stopCh)
  233. }
  234. go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)
  235. <-stopCh
  236. }
  237. func (dsc *DaemonSetsController) runWorker() {
  238. for dsc.processNextWorkItem() {
  239. }
  240. }
  241. // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
  242. func (dsc *DaemonSetsController) processNextWorkItem() bool {
  243. dsKey, quit := dsc.queue.Get()
  244. if quit {
  245. return false
  246. }
  247. defer dsc.queue.Done(dsKey)
  248. err := dsc.syncHandler(dsKey.(string))
  249. if err == nil {
  250. dsc.queue.Forget(dsKey)
  251. return true
  252. }
  253. utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
  254. dsc.queue.AddRateLimited(dsKey)
  255. return true
  256. }
  257. func (dsc *DaemonSetsController) enqueue(ds *apps.DaemonSet) {
  258. key, err := controller.KeyFunc(ds)
  259. if err != nil {
  260. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
  261. return
  262. }
  263. // TODO: Handle overlapping controllers better. See comment in ReplicationManager.
  264. dsc.queue.Add(key)
  265. }
  266. func (dsc *DaemonSetsController) enqueueRateLimited(ds *apps.DaemonSet) {
  267. key, err := controller.KeyFunc(ds)
  268. if err != nil {
  269. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
  270. return
  271. }
  272. dsc.queue.AddRateLimited(key)
  273. }
  274. func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) {
  275. key, err := controller.KeyFunc(obj)
  276. if err != nil {
  277. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  278. return
  279. }
  280. // TODO: Handle overlapping controllers better. See comment in ReplicationManager.
  281. dsc.queue.AddAfter(key, after)
  282. }
  283. // getDaemonSetsForPod returns a list of DaemonSets that potentially match the pod.
  284. func (dsc *DaemonSetsController) getDaemonSetsForPod(pod *v1.Pod) []*apps.DaemonSet {
  285. sets, err := dsc.dsLister.GetPodDaemonSets(pod)
  286. if err != nil {
  287. return nil
  288. }
  289. if len(sets) > 1 {
  290. // ControllerRef will ensure we don't do anything crazy, but more than one
  291. // item in this list nevertheless constitutes user error.
  292. utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
  293. }
  294. return sets
  295. }
  296. // getDaemonSetsForHistory returns a list of DaemonSets that potentially
  297. // match a ControllerRevision.
  298. func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.ControllerRevision) []*apps.DaemonSet {
  299. daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history)
  300. if err != nil || len(daemonSets) == 0 {
  301. return nil
  302. }
  303. if len(daemonSets) > 1 {
  304. // ControllerRef will ensure we don't do anything crazy, but more than one
  305. // item in this list nevertheless constitutes user error.
  306. klog.V(4).Infof("User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v",
  307. history.Namespace, history.Name, history.Labels)
  308. }
  309. return daemonSets
  310. }
  311. // addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created
  312. // or when the controller manager is restarted.
  313. func (dsc *DaemonSetsController) addHistory(obj interface{}) {
  314. history := obj.(*apps.ControllerRevision)
  315. if history.DeletionTimestamp != nil {
  316. // On a restart of the controller manager, it's possible for an object to
  317. // show up in a state that is already pending deletion.
  318. dsc.deleteHistory(history)
  319. return
  320. }
  321. // If it has a ControllerRef, that's all that matters.
  322. if controllerRef := metav1.GetControllerOf(history); controllerRef != nil {
  323. ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
  324. if ds == nil {
  325. return
  326. }
  327. klog.V(4).Infof("ControllerRevision %s added.", history.Name)
  328. return
  329. }
  330. // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
  331. // them to see if anyone wants to adopt it.
  332. daemonSets := dsc.getDaemonSetsForHistory(history)
  333. if len(daemonSets) == 0 {
  334. return
  335. }
  336. klog.V(4).Infof("Orphan ControllerRevision %s added.", history.Name)
  337. for _, ds := range daemonSets {
  338. dsc.enqueueDaemonSet(ds)
  339. }
  340. }
  341. // updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision
  342. // is updated and wake them up. If anything of the ControllerRevision has changed, we need to awaken
  343. // both the old and new DaemonSets.
  344. func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
  345. curHistory := cur.(*apps.ControllerRevision)
  346. oldHistory := old.(*apps.ControllerRevision)
  347. if curHistory.ResourceVersion == oldHistory.ResourceVersion {
  348. // Periodic resync will send update events for all known ControllerRevisions.
  349. return
  350. }
  351. curControllerRef := metav1.GetControllerOf(curHistory)
  352. oldControllerRef := metav1.GetControllerOf(oldHistory)
  353. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  354. if controllerRefChanged && oldControllerRef != nil {
  355. // The ControllerRef was changed. Sync the old controller, if any.
  356. if ds := dsc.resolveControllerRef(oldHistory.Namespace, oldControllerRef); ds != nil {
  357. dsc.enqueueDaemonSet(ds)
  358. }
  359. }
  360. // If it has a ControllerRef, that's all that matters.
  361. if curControllerRef != nil {
  362. ds := dsc.resolveControllerRef(curHistory.Namespace, curControllerRef)
  363. if ds == nil {
  364. return
  365. }
  366. klog.V(4).Infof("ControllerRevision %s updated.", curHistory.Name)
  367. dsc.enqueueDaemonSet(ds)
  368. return
  369. }
  370. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  371. // to see if anyone wants to adopt it now.
  372. labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels)
  373. if labelChanged || controllerRefChanged {
  374. daemonSets := dsc.getDaemonSetsForHistory(curHistory)
  375. if len(daemonSets) == 0 {
  376. return
  377. }
  378. klog.V(4).Infof("Orphan ControllerRevision %s updated.", curHistory.Name)
  379. for _, ds := range daemonSets {
  380. dsc.enqueueDaemonSet(ds)
  381. }
  382. }
  383. }
  384. // deleteHistory enqueues the DaemonSet that manages a ControllerRevision when
  385. // the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or
  386. // a DeletionFinalStateUnknown marker item.
  387. func (dsc *DaemonSetsController) deleteHistory(obj interface{}) {
  388. history, ok := obj.(*apps.ControllerRevision)
  389. // When a delete is dropped, the relist will notice a ControllerRevision in the store not
  390. // in the list, leading to the insertion of a tombstone object which contains
  391. // the deleted key/value. Note that this value might be stale. If the ControllerRevision
  392. // changed labels the new DaemonSet will not be woken up till the periodic resync.
  393. if !ok {
  394. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  395. if !ok {
  396. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  397. return
  398. }
  399. history, ok = tombstone.Obj.(*apps.ControllerRevision)
  400. if !ok {
  401. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ControllerRevision %#v", obj))
  402. return
  403. }
  404. }
  405. controllerRef := metav1.GetControllerOf(history)
  406. if controllerRef == nil {
  407. // No controller should care about orphans being deleted.
  408. return
  409. }
  410. ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
  411. if ds == nil {
  412. return
  413. }
  414. klog.V(4).Infof("ControllerRevision %s deleted.", history.Name)
  415. dsc.enqueueDaemonSet(ds)
  416. }
  417. func (dsc *DaemonSetsController) addPod(obj interface{}) {
  418. pod := obj.(*v1.Pod)
  419. if pod.DeletionTimestamp != nil {
  420. // on a restart of the controller manager, it's possible a new pod shows up in a state that
  421. // is already pending deletion. Prevent the pod from being a creation observation.
  422. dsc.deletePod(pod)
  423. return
  424. }
  425. // If it has a ControllerRef, that's all that matters.
  426. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
  427. ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
  428. if ds == nil {
  429. return
  430. }
  431. dsKey, err := controller.KeyFunc(ds)
  432. if err != nil {
  433. return
  434. }
  435. klog.V(4).Infof("Pod %s added.", pod.Name)
  436. dsc.expectations.CreationObserved(dsKey)
  437. dsc.enqueueDaemonSet(ds)
  438. return
  439. }
  440. // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
  441. // them to see if anyone wants to adopt it.
  442. // DO NOT observe creation because no controller should be waiting for an
  443. // orphan.
  444. dss := dsc.getDaemonSetsForPod(pod)
  445. if len(dss) == 0 {
  446. return
  447. }
  448. klog.V(4).Infof("Orphan Pod %s added.", pod.Name)
  449. for _, ds := range dss {
  450. dsc.enqueueDaemonSet(ds)
  451. }
  452. }
  453. // When a pod is updated, figure out what sets manage it and wake them
  454. // up. If the labels of the pod have changed we need to awaken both the old
  455. // and new set. old and cur must be *v1.Pod types.
  456. func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
  457. curPod := cur.(*v1.Pod)
  458. oldPod := old.(*v1.Pod)
  459. if curPod.ResourceVersion == oldPod.ResourceVersion {
  460. // Periodic resync will send update events for all known pods.
  461. // Two different versions of the same pod will always have different RVs.
  462. return
  463. }
  464. if curPod.DeletionTimestamp != nil {
  465. // when a pod is deleted gracefully its deletion timestamp is first modified to reflect a grace period,
  466. // and after such time has passed, the kubelet actually deletes it from the store. We receive an update
  467. // for modification of the deletion timestamp and expect an ds to create more replicas asap, not wait
  468. // until the kubelet actually deletes the pod.
  469. dsc.deletePod(curPod)
  470. return
  471. }
  472. curControllerRef := metav1.GetControllerOf(curPod)
  473. oldControllerRef := metav1.GetControllerOf(oldPod)
  474. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  475. if controllerRefChanged && oldControllerRef != nil {
  476. // The ControllerRef was changed. Sync the old controller, if any.
  477. if ds := dsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); ds != nil {
  478. dsc.enqueueDaemonSet(ds)
  479. }
  480. }
  481. // If it has a ControllerRef, that's all that matters.
  482. if curControllerRef != nil {
  483. ds := dsc.resolveControllerRef(curPod.Namespace, curControllerRef)
  484. if ds == nil {
  485. return
  486. }
  487. klog.V(4).Infof("Pod %s updated.", curPod.Name)
  488. dsc.enqueueDaemonSet(ds)
  489. changedToReady := !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod)
  490. // See https://github.com/kubernetes/kubernetes/pull/38076 for more details
  491. if changedToReady && ds.Spec.MinReadySeconds > 0 {
  492. // Add a second to avoid milliseconds skew in AddAfter.
  493. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
  494. dsc.enqueueDaemonSetAfter(ds, (time.Duration(ds.Spec.MinReadySeconds)*time.Second)+time.Second)
  495. }
  496. return
  497. }
  498. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  499. // to see if anyone wants to adopt it now.
  500. dss := dsc.getDaemonSetsForPod(curPod)
  501. if len(dss) == 0 {
  502. return
  503. }
  504. klog.V(4).Infof("Orphan Pod %s updated.", curPod.Name)
  505. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  506. if labelChanged || controllerRefChanged {
  507. for _, ds := range dss {
  508. dsc.enqueueDaemonSet(ds)
  509. }
  510. }
  511. }
  512. func (dsc *DaemonSetsController) deletePod(obj interface{}) {
  513. pod, ok := obj.(*v1.Pod)
  514. // When a delete is dropped, the relist will notice a pod in the store not
  515. // in the list, leading to the insertion of a tombstone object which contains
  516. // the deleted key/value. Note that this value might be stale. If the pod
  517. // changed labels the new daemonset will not be woken up till the periodic
  518. // resync.
  519. if !ok {
  520. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  521. if !ok {
  522. utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
  523. return
  524. }
  525. pod, ok = tombstone.Obj.(*v1.Pod)
  526. if !ok {
  527. utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
  528. return
  529. }
  530. }
  531. controllerRef := metav1.GetControllerOf(pod)
  532. if controllerRef == nil {
  533. // No controller should care about orphans being deleted.
  534. return
  535. }
  536. ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
  537. if ds == nil {
  538. return
  539. }
  540. dsKey, err := controller.KeyFunc(ds)
  541. if err != nil {
  542. return
  543. }
  544. klog.V(4).Infof("Pod %s deleted.", pod.Name)
  545. dsc.expectations.DeletionObserved(dsKey)
  546. dsc.enqueueDaemonSet(ds)
  547. }
  548. func (dsc *DaemonSetsController) addNode(obj interface{}) {
  549. // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
  550. dsList, err := dsc.dsLister.List(labels.Everything())
  551. if err != nil {
  552. klog.V(4).Infof("Error enqueueing daemon sets: %v", err)
  553. return
  554. }
  555. node := obj.(*v1.Node)
  556. for _, ds := range dsList {
  557. shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
  558. if err != nil {
  559. continue
  560. }
  561. if shouldRun {
  562. dsc.enqueueDaemonSet(ds)
  563. }
  564. }
  565. }
  566. // nodeInSameCondition returns true if all effective types ("Status" is true) equals;
  567. // otherwise, returns false.
  568. func nodeInSameCondition(old []v1.NodeCondition, cur []v1.NodeCondition) bool {
  569. if len(old) == 0 && len(cur) == 0 {
  570. return true
  571. }
  572. c1map := map[v1.NodeConditionType]v1.ConditionStatus{}
  573. for _, c := range old {
  574. if c.Status == v1.ConditionTrue {
  575. c1map[c.Type] = c.Status
  576. }
  577. }
  578. for _, c := range cur {
  579. if c.Status != v1.ConditionTrue {
  580. continue
  581. }
  582. if _, found := c1map[c.Type]; !found {
  583. return false
  584. }
  585. delete(c1map, c.Type)
  586. }
  587. return len(c1map) == 0
  588. }
  589. func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
  590. if !nodeInSameCondition(oldNode.Status.Conditions, curNode.Status.Conditions) {
  591. return false
  592. }
  593. oldNode.ResourceVersion = curNode.ResourceVersion
  594. oldNode.Status.Conditions = curNode.Status.Conditions
  595. return apiequality.Semantic.DeepEqual(oldNode, curNode)
  596. }
  597. func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
  598. oldNode := old.(*v1.Node)
  599. curNode := cur.(*v1.Node)
  600. if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
  601. return
  602. }
  603. dsList, err := dsc.dsLister.List(labels.Everything())
  604. if err != nil {
  605. klog.V(4).Infof("Error listing daemon sets: %v", err)
  606. return
  607. }
  608. // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
  609. for _, ds := range dsList {
  610. oldShouldRun, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
  611. if err != nil {
  612. continue
  613. }
  614. currentShouldRun, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
  615. if err != nil {
  616. continue
  617. }
  618. if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) {
  619. dsc.enqueueDaemonSet(ds)
  620. }
  621. }
  622. }
  623. // getDaemonPods returns daemon pods owned by the given ds.
  624. // This also reconciles ControllerRef by adopting/orphaning.
  625. // Note that returned Pods are pointers to objects in the cache.
  626. // If you want to modify one, you need to deep-copy it first.
  627. func (dsc *DaemonSetsController) getDaemonPods(ds *apps.DaemonSet) ([]*v1.Pod, error) {
  628. selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
  629. if err != nil {
  630. return nil, err
  631. }
  632. // List all pods to include those that don't match the selector anymore but
  633. // have a ControllerRef pointing to this controller.
  634. pods, err := dsc.podLister.Pods(ds.Namespace).List(labels.Everything())
  635. if err != nil {
  636. return nil, err
  637. }
  638. // If any adoptions are attempted, we should first recheck for deletion with
  639. // an uncached quorum read sometime after listing Pods (see #42639).
  640. dsNotDeleted := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  641. fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{})
  642. if err != nil {
  643. return nil, err
  644. }
  645. if fresh.UID != ds.UID {
  646. return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
  647. }
  648. return fresh, nil
  649. })
  650. // Use ControllerRefManager to adopt/orphan as needed.
  651. cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, dsNotDeleted)
  652. return cm.ClaimPods(pods)
  653. }
  654. // getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) created for the nodes.
  655. // This also reconciles ControllerRef by adopting/orphaning.
  656. // Note that returned Pods are pointers to objects in the cache.
  657. // If you want to modify one, you need to deep-copy it first.
  658. func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[string][]*v1.Pod, error) {
  659. claimedPods, err := dsc.getDaemonPods(ds)
  660. if err != nil {
  661. return nil, err
  662. }
  663. // Group Pods by Node name.
  664. nodeToDaemonPods := make(map[string][]*v1.Pod)
  665. for _, pod := range claimedPods {
  666. nodeName, err := util.GetTargetNodeName(pod)
  667. if err != nil {
  668. klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
  669. pod.Namespace, pod.Name, ds.Namespace, ds.Name)
  670. continue
  671. }
  672. nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
  673. }
  674. return nodeToDaemonPods, nil
  675. }
  676. // resolveControllerRef returns the controller referenced by a ControllerRef,
  677. // or nil if the ControllerRef could not be resolved to a matching controller
  678. // of the correct Kind.
  679. func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.DaemonSet {
  680. // We can't look up by UID, so look up by Name and then verify UID.
  681. // Don't even try to look up by Name if it's the wrong Kind.
  682. if controllerRef.Kind != controllerKind.Kind {
  683. return nil
  684. }
  685. ds, err := dsc.dsLister.DaemonSets(namespace).Get(controllerRef.Name)
  686. if err != nil {
  687. return nil
  688. }
  689. if ds.UID != controllerRef.UID {
  690. // The controller we found with this Name is not the same one that the
  691. // ControllerRef points to.
  692. return nil
  693. }
  694. return ds
  695. }
  696. // podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node:
  697. // - nodesNeedingDaemonPods: the pods need to start on the node
  698. // - podsToDelete: the Pods need to be deleted on the node
  699. // - err: unexpected error
  700. func (dsc *DaemonSetsController) podsShouldBeOnNode(
  701. node *v1.Node,
  702. nodeToDaemonPods map[string][]*v1.Pod,
  703. ds *apps.DaemonSet,
  704. ) (nodesNeedingDaemonPods, podsToDelete []string, err error) {
  705. shouldRun, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
  706. if err != nil {
  707. return
  708. }
  709. daemonPods, exists := nodeToDaemonPods[node.Name]
  710. switch {
  711. case shouldRun && !exists:
  712. // If daemon pod is supposed to be running on node, but isn't, create daemon pod.
  713. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
  714. case shouldContinueRunning:
  715. // If a daemon pod failed, delete it
  716. // If there's non-daemon pods left on this node, we will create it in the next sync loop
  717. var daemonPodsRunning []*v1.Pod
  718. for _, pod := range daemonPods {
  719. if pod.DeletionTimestamp != nil {
  720. continue
  721. }
  722. if pod.Status.Phase == v1.PodFailed {
  723. // This is a critical place where DS is often fighting with kubelet that rejects pods.
  724. // We need to avoid hot looping and backoff.
  725. backoffKey := failedPodsBackoffKey(ds, node.Name)
  726. now := dsc.failedPodsBackoff.Clock.Now()
  727. inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
  728. if inBackoff {
  729. delay := dsc.failedPodsBackoff.Get(backoffKey)
  730. klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
  731. pod.Namespace, pod.Name, node.Name, delay)
  732. dsc.enqueueDaemonSetAfter(ds, delay)
  733. continue
  734. }
  735. dsc.failedPodsBackoff.Next(backoffKey, now)
  736. msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
  737. klog.V(2).Infof(msg)
  738. // Emit an event so that it's discoverable to users.
  739. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
  740. podsToDelete = append(podsToDelete, pod.Name)
  741. } else {
  742. daemonPodsRunning = append(daemonPodsRunning, pod)
  743. }
  744. }
  745. // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
  746. // Sort the daemon pods by creation time, so the oldest is preserved.
  747. if len(daemonPodsRunning) > 1 {
  748. sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
  749. for i := 1; i < len(daemonPodsRunning); i++ {
  750. podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
  751. }
  752. }
  753. case !shouldContinueRunning && exists:
  754. // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
  755. for _, pod := range daemonPods {
  756. if pod.DeletionTimestamp != nil {
  757. continue
  758. }
  759. podsToDelete = append(podsToDelete, pod.Name)
  760. }
  761. }
  762. return nodesNeedingDaemonPods, podsToDelete, nil
  763. }
  764. // manage manages the scheduling and running of Pods of ds on nodes.
  765. // After figuring out which nodes should run a Pod of ds but not yet running one and
  766. // which nodes should not run a Pod of ds but currently running one, it calls function
  767. // syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
  768. func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
  769. // Find out the pods which are created for the nodes by DaemonSet.
  770. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
  771. if err != nil {
  772. return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
  773. }
  774. // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
  775. // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
  776. var nodesNeedingDaemonPods, podsToDelete []string
  777. for _, node := range nodeList {
  778. nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
  779. node, nodeToDaemonPods, ds)
  780. if err != nil {
  781. continue
  782. }
  783. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
  784. podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
  785. }
  786. // Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
  787. // If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
  788. podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
  789. // Label new pods using the hash label value of the current history when creating them
  790. if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
  791. return err
  792. }
  793. return nil
  794. }
  795. // syncNodes deletes given pods and creates new daemon set pods on the given nodes
  796. // returns slice with errors if any
  797. func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
  798. // We need to set expectations before creating/deleting pods to avoid race conditions.
  799. dsKey, err := controller.KeyFunc(ds)
  800. if err != nil {
  801. return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
  802. }
  803. createDiff := len(nodesNeedingDaemonPods)
  804. deleteDiff := len(podsToDelete)
  805. if createDiff > dsc.burstReplicas {
  806. createDiff = dsc.burstReplicas
  807. }
  808. if deleteDiff > dsc.burstReplicas {
  809. deleteDiff = dsc.burstReplicas
  810. }
  811. dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
  812. // error channel to communicate back failures. make the buffer big enough to avoid any blocking
  813. errCh := make(chan error, createDiff+deleteDiff)
  814. klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
  815. createWait := sync.WaitGroup{}
  816. // If the returned error is not nil we have a parse error.
  817. // The controller handles this via the hash.
  818. generation, err := util.GetTemplateGeneration(ds)
  819. if err != nil {
  820. generation = nil
  821. }
  822. template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
  823. // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
  824. // and double with each successful iteration in a kind of "slow start".
  825. // This handles attempts to start large numbers of pods that would
  826. // likely all fail with the same error. For example a project with a
  827. // low quota that attempts to create a large number of pods will be
  828. // prevented from spamming the API service with the pod create requests
  829. // after one of its pods fails. Conveniently, this also prevents the
  830. // event spam that those failures would generate.
  831. batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
  832. for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
  833. errorCount := len(errCh)
  834. createWait.Add(batchSize)
  835. for i := pos; i < pos+batchSize; i++ {
  836. go func(ix int) {
  837. defer createWait.Done()
  838. podTemplate := template.DeepCopy()
  839. // The pod's NodeAffinity will be updated to make sure the Pod is bound
  840. // to the target node by default scheduler. It is safe to do so because there
  841. // should be no conflicting node affinity with the target node.
  842. podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
  843. podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
  844. err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
  845. ds, metav1.NewControllerRef(ds, controllerKind))
  846. if err != nil {
  847. if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
  848. // If the namespace is being torn down, we can safely ignore
  849. // this error since all subsequent creations will fail.
  850. return
  851. }
  852. }
  853. if err != nil {
  854. klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
  855. dsc.expectations.CreationObserved(dsKey)
  856. errCh <- err
  857. utilruntime.HandleError(err)
  858. }
  859. }(i)
  860. }
  861. createWait.Wait()
  862. // any skipped pods that we never attempted to start shouldn't be expected.
  863. skippedPods := createDiff - (batchSize + pos)
  864. if errorCount < len(errCh) && skippedPods > 0 {
  865. klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
  866. dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
  867. // The skipped pods will be retried later. The next controller resync will
  868. // retry the slow start process.
  869. break
  870. }
  871. }
  872. klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
  873. deleteWait := sync.WaitGroup{}
  874. deleteWait.Add(deleteDiff)
  875. for i := 0; i < deleteDiff; i++ {
  876. go func(ix int) {
  877. defer deleteWait.Done()
  878. if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
  879. klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
  880. dsc.expectations.DeletionObserved(dsKey)
  881. errCh <- err
  882. utilruntime.HandleError(err)
  883. }
  884. }(i)
  885. }
  886. deleteWait.Wait()
  887. // collect errors if any for proper reporting/retry logic in the controller
  888. errors := []error{}
  889. close(errCh)
  890. for err := range errCh {
  891. errors = append(errors, err)
  892. }
  893. return utilerrors.NewAggregate(errors)
  894. }
  895. func storeDaemonSetStatus(dsClient unversionedapps.DaemonSetInterface, ds *apps.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int, updateObservedGen bool) error {
  896. if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
  897. int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
  898. int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
  899. int(ds.Status.NumberReady) == numberReady &&
  900. int(ds.Status.UpdatedNumberScheduled) == updatedNumberScheduled &&
  901. int(ds.Status.NumberAvailable) == numberAvailable &&
  902. int(ds.Status.NumberUnavailable) == numberUnavailable &&
  903. ds.Status.ObservedGeneration >= ds.Generation {
  904. return nil
  905. }
  906. toUpdate := ds.DeepCopy()
  907. var updateErr, getErr error
  908. for i := 0; i < StatusUpdateRetries; i++ {
  909. if updateObservedGen {
  910. toUpdate.Status.ObservedGeneration = ds.Generation
  911. }
  912. toUpdate.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
  913. toUpdate.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
  914. toUpdate.Status.NumberMisscheduled = int32(numberMisscheduled)
  915. toUpdate.Status.NumberReady = int32(numberReady)
  916. toUpdate.Status.UpdatedNumberScheduled = int32(updatedNumberScheduled)
  917. toUpdate.Status.NumberAvailable = int32(numberAvailable)
  918. toUpdate.Status.NumberUnavailable = int32(numberUnavailable)
  919. if _, updateErr = dsClient.UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{}); updateErr == nil {
  920. return nil
  921. }
  922. // Update the set with the latest resource version for the next poll
  923. if toUpdate, getErr = dsClient.Get(context.TODO(), ds.Name, metav1.GetOptions{}); getErr != nil {
  924. // If the GET fails we can't trust status.Replicas anymore. This error
  925. // is bound to be more interesting than the update failure.
  926. return getErr
  927. }
  928. }
  929. return updateErr
  930. }
  931. func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
  932. klog.V(4).Infof("Updating daemon set status")
  933. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
  934. if err != nil {
  935. return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
  936. }
  937. var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
  938. for _, node := range nodeList {
  939. shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
  940. if err != nil {
  941. return err
  942. }
  943. scheduled := len(nodeToDaemonPods[node.Name]) > 0
  944. if shouldRun {
  945. desiredNumberScheduled++
  946. if scheduled {
  947. currentNumberScheduled++
  948. // Sort the daemon pods by creation time, so that the oldest is first.
  949. daemonPods, _ := nodeToDaemonPods[node.Name]
  950. sort.Sort(podByCreationTimestampAndPhase(daemonPods))
  951. pod := daemonPods[0]
  952. if podutil.IsPodReady(pod) {
  953. numberReady++
  954. if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
  955. numberAvailable++
  956. }
  957. }
  958. // If the returned error is not nil we have a parse error.
  959. // The controller handles this via the hash.
  960. generation, err := util.GetTemplateGeneration(ds)
  961. if err != nil {
  962. generation = nil
  963. }
  964. if util.IsPodUpdated(pod, hash, generation) {
  965. updatedNumberScheduled++
  966. }
  967. }
  968. } else {
  969. if scheduled {
  970. numberMisscheduled++
  971. }
  972. }
  973. }
  974. numberUnavailable := desiredNumberScheduled - numberAvailable
  975. err = storeDaemonSetStatus(dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
  976. if err != nil {
  977. return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
  978. }
  979. // Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew.
  980. if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
  981. dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
  982. }
  983. return nil
  984. }
  985. func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
  986. startTime := time.Now()
  987. defer func() {
  988. klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
  989. }()
  990. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  991. if err != nil {
  992. return err
  993. }
  994. ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
  995. if errors.IsNotFound(err) {
  996. klog.V(3).Infof("daemon set has been deleted %v", key)
  997. dsc.expectations.DeleteExpectations(key)
  998. return nil
  999. }
  1000. if err != nil {
  1001. return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
  1002. }
  1003. nodeList, err := dsc.nodeLister.List(labels.Everything())
  1004. if err != nil {
  1005. return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
  1006. }
  1007. everything := metav1.LabelSelector{}
  1008. if reflect.DeepEqual(ds.Spec.Selector, &everything) {
  1009. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
  1010. return nil
  1011. }
  1012. // Don't process a daemon set until all its creations and deletions have been processed.
  1013. // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
  1014. // then we do not want to call manage on foo until the daemon pods have been created.
  1015. dsKey, err := controller.KeyFunc(ds)
  1016. if err != nil {
  1017. return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
  1018. }
  1019. // If the DaemonSet is being deleted (either by foreground deletion or
  1020. // orphan deletion), we cannot be sure if the DaemonSet history objects
  1021. // it owned still exist -- those history objects can either be deleted
  1022. // or orphaned. Garbage collector doesn't guarantee that it will delete
  1023. // DaemonSet pods before deleting DaemonSet history objects, because
  1024. // DaemonSet history doesn't own DaemonSet pods. We cannot reliably
  1025. // calculate the status of a DaemonSet being deleted. Therefore, return
  1026. // here without updating status for the DaemonSet being deleted.
  1027. if ds.DeletionTimestamp != nil {
  1028. return nil
  1029. }
  1030. // Construct histories of the DaemonSet, and get the hash of current history
  1031. cur, old, err := dsc.constructHistory(ds)
  1032. if err != nil {
  1033. return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
  1034. }
  1035. hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
  1036. if !dsc.expectations.SatisfiedExpectations(dsKey) {
  1037. // Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
  1038. return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
  1039. }
  1040. err = dsc.manage(ds, nodeList, hash)
  1041. if err != nil {
  1042. return err
  1043. }
  1044. // Process rolling updates if we're ready.
  1045. if dsc.expectations.SatisfiedExpectations(dsKey) {
  1046. switch ds.Spec.UpdateStrategy.Type {
  1047. case apps.OnDeleteDaemonSetStrategyType:
  1048. case apps.RollingUpdateDaemonSetStrategyType:
  1049. err = dsc.rollingUpdate(ds, nodeList, hash)
  1050. }
  1051. if err != nil {
  1052. return err
  1053. }
  1054. }
  1055. err = dsc.cleanupHistory(ds, old)
  1056. if err != nil {
  1057. return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
  1058. }
  1059. return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
  1060. }
  1061. // nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
  1062. // summary. Returned booleans are:
  1063. // * shouldRun:
  1064. // Returns true when a daemonset should run on the node if a daemonset pod is not already
  1065. // running on that node.
  1066. // * shouldContinueRunning:
  1067. // Returns true when a daemonset should continue running on a node if a daemonset pod is already
  1068. // running on that node.
  1069. func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool, error) {
  1070. pod := NewPod(ds, node.Name)
  1071. // If the daemon set specifies a node name, check that it matches with node.Name.
  1072. if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
  1073. return false, false, nil
  1074. }
  1075. nodeInfo := schedulernodeinfo.NewNodeInfo()
  1076. nodeInfo.SetNode(node)
  1077. taints, err := nodeInfo.Taints()
  1078. if err != nil {
  1079. klog.Warningf("failed to get node %q taints: %v", node.Name, err)
  1080. return false, false, err
  1081. }
  1082. fitsNodeName, fitsNodeAffinity, fitsTaints := Predicates(pod, node, taints)
  1083. if !fitsNodeName || !fitsNodeAffinity {
  1084. return false, false, nil
  1085. }
  1086. if !fitsTaints {
  1087. // Scheduled daemon pods should continue running if they tolerate NoExecute taint.
  1088. shouldContinueRunning := v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, func(t *v1.Taint) bool {
  1089. return t.Effect == v1.TaintEffectNoExecute
  1090. })
  1091. return false, shouldContinueRunning, nil
  1092. }
  1093. return true, true, nil
  1094. }
  1095. // Predicates checks if a DaemonSet's pod can run on a node.
  1096. func Predicates(pod *v1.Pod, node *v1.Node, taints []v1.Taint) (fitsNodeName, fitsNodeAffinity, fitsTaints bool) {
  1097. fitsNodeName = len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == node.Name
  1098. fitsNodeAffinity = pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node)
  1099. fitsTaints = v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, func(t *v1.Taint) bool {
  1100. return t.Effect == v1.TaintEffectNoExecute || t.Effect == v1.TaintEffectNoSchedule
  1101. })
  1102. return
  1103. }
  1104. // NewPod creates a new pod
  1105. func NewPod(ds *apps.DaemonSet, nodeName string) *v1.Pod {
  1106. newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
  1107. newPod.Namespace = ds.Namespace
  1108. newPod.Spec.NodeName = nodeName
  1109. // Added default tolerations for DaemonSet pods.
  1110. util.AddOrUpdateDaemonPodTolerations(&newPod.Spec)
  1111. return newPod
  1112. }
  1113. type podByCreationTimestampAndPhase []*v1.Pod
  1114. func (o podByCreationTimestampAndPhase) Len() int { return len(o) }
  1115. func (o podByCreationTimestampAndPhase) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
  1116. func (o podByCreationTimestampAndPhase) Less(i, j int) bool {
  1117. // Scheduled Pod first
  1118. if len(o[i].Spec.NodeName) != 0 && len(o[j].Spec.NodeName) == 0 {
  1119. return true
  1120. }
  1121. if len(o[i].Spec.NodeName) == 0 && len(o[j].Spec.NodeName) != 0 {
  1122. return false
  1123. }
  1124. if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
  1125. return o[i].Name < o[j].Name
  1126. }
  1127. return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
  1128. }
  1129. func failedPodsBackoffKey(ds *apps.DaemonSet, nodeName string) string {
  1130. return fmt.Sprintf("%s/%d/%s", ds.UID, ds.Status.ObservedGeneration, nodeName)
  1131. }
  1132. // getUnscheduledPodsWithoutNode returns list of unscheduled pods assigned to not existing nodes.
  1133. // Returned pods can't be deleted by PodGCController so they should be deleted by DaemonSetController.
  1134. func getUnscheduledPodsWithoutNode(runningNodesList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) []string {
  1135. var results []string
  1136. isNodeRunning := make(map[string]bool)
  1137. for _, node := range runningNodesList {
  1138. isNodeRunning[node.Name] = true
  1139. }
  1140. for n, pods := range nodeToDaemonPods {
  1141. if !isNodeRunning[n] {
  1142. for _, pod := range pods {
  1143. if len(pod.Spec.NodeName) == 0 {
  1144. results = append(results, pod.Name)
  1145. }
  1146. }
  1147. }
  1148. }
  1149. return results
  1150. }