deployment_controller.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package deployment contains all the logic for handling Kubernetes Deployments.
  14. // It implements a set of strategies (rolling, recreate) for deploying an application,
  15. // the means to rollback to previous versions, proportional scaling for mitigating
  16. // risk, cleanup policy, and other useful features of Deployments.
  17. package deployment
  18. import (
  19. "context"
  20. "fmt"
  21. "reflect"
  22. "time"
  23. "k8s.io/klog"
  24. apps "k8s.io/api/apps/v1"
  25. "k8s.io/api/core/v1"
  26. "k8s.io/apimachinery/pkg/api/errors"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/labels"
  29. "k8s.io/apimachinery/pkg/types"
  30. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. appsinformers "k8s.io/client-go/informers/apps/v1"
  33. coreinformers "k8s.io/client-go/informers/core/v1"
  34. clientset "k8s.io/client-go/kubernetes"
  35. "k8s.io/client-go/kubernetes/scheme"
  36. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  37. appslisters "k8s.io/client-go/listers/apps/v1"
  38. corelisters "k8s.io/client-go/listers/core/v1"
  39. "k8s.io/client-go/tools/cache"
  40. "k8s.io/client-go/tools/record"
  41. "k8s.io/client-go/util/workqueue"
  42. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  43. "k8s.io/kubernetes/pkg/controller"
  44. "k8s.io/kubernetes/pkg/controller/deployment/util"
  45. )
  46. const (
  47. // maxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
  48. // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
  49. // a deployment is going to be requeued:
  50. //
  51. // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
  52. maxRetries = 15
  53. )
  54. // controllerKind contains the schema.GroupVersionKind for this controller type.
  55. var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
  56. // DeploymentController is responsible for synchronizing Deployment objects stored
  57. // in the system with actual running replica sets and pods.
  58. type DeploymentController struct {
  59. // rsControl is used for adopting/releasing replica sets.
  60. rsControl controller.RSControlInterface
  61. client clientset.Interface
  62. eventRecorder record.EventRecorder
  63. // To allow injection of syncDeployment for testing.
  64. syncHandler func(dKey string) error
  65. // used for unit testing
  66. enqueueDeployment func(deployment *apps.Deployment)
  67. // dLister can list/get deployments from the shared informer's store
  68. dLister appslisters.DeploymentLister
  69. // rsLister can list/get replica sets from the shared informer's store
  70. rsLister appslisters.ReplicaSetLister
  71. // podLister can list/get pods from the shared informer's store
  72. podLister corelisters.PodLister
  73. // dListerSynced returns true if the Deployment store has been synced at least once.
  74. // Added as a member to the struct to allow injection for testing.
  75. dListerSynced cache.InformerSynced
  76. // rsListerSynced returns true if the ReplicaSet store has been synced at least once.
  77. // Added as a member to the struct to allow injection for testing.
  78. rsListerSynced cache.InformerSynced
  79. // podListerSynced returns true if the pod store has been synced at least once.
  80. // Added as a member to the struct to allow injection for testing.
  81. podListerSynced cache.InformerSynced
  82. // Deployments that need to be synced
  83. queue workqueue.RateLimitingInterface
  84. }
  85. // NewDeploymentController creates a new DeploymentController.
  86. func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
  87. eventBroadcaster := record.NewBroadcaster()
  88. eventBroadcaster.StartLogging(klog.Infof)
  89. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
  90. if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
  91. if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  92. return nil, err
  93. }
  94. }
  95. dc := &DeploymentController{
  96. client: client,
  97. eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
  98. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
  99. }
  100. dc.rsControl = controller.RealRSControl{
  101. KubeClient: client,
  102. Recorder: dc.eventRecorder,
  103. }
  104. dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  105. AddFunc: dc.addDeployment,
  106. UpdateFunc: dc.updateDeployment,
  107. // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
  108. DeleteFunc: dc.deleteDeployment,
  109. })
  110. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  111. AddFunc: dc.addReplicaSet,
  112. UpdateFunc: dc.updateReplicaSet,
  113. DeleteFunc: dc.deleteReplicaSet,
  114. })
  115. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  116. DeleteFunc: dc.deletePod,
  117. })
  118. dc.syncHandler = dc.syncDeployment
  119. dc.enqueueDeployment = dc.enqueue
  120. dc.dLister = dInformer.Lister()
  121. dc.rsLister = rsInformer.Lister()
  122. dc.podLister = podInformer.Lister()
  123. dc.dListerSynced = dInformer.Informer().HasSynced
  124. dc.rsListerSynced = rsInformer.Informer().HasSynced
  125. dc.podListerSynced = podInformer.Informer().HasSynced
  126. return dc, nil
  127. }
  128. // Run begins watching and syncing.
  129. func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
  130. defer utilruntime.HandleCrash()
  131. defer dc.queue.ShutDown()
  132. klog.Infof("Starting deployment controller")
  133. defer klog.Infof("Shutting down deployment controller")
  134. if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
  135. return
  136. }
  137. for i := 0; i < workers; i++ {
  138. go wait.Until(dc.worker, time.Second, stopCh)
  139. }
  140. <-stopCh
  141. }
  142. func (dc *DeploymentController) addDeployment(obj interface{}) {
  143. d := obj.(*apps.Deployment)
  144. klog.V(4).Infof("Adding deployment %s", d.Name)
  145. dc.enqueueDeployment(d)
  146. }
  147. func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
  148. oldD := old.(*apps.Deployment)
  149. curD := cur.(*apps.Deployment)
  150. klog.V(4).Infof("Updating deployment %s", oldD.Name)
  151. dc.enqueueDeployment(curD)
  152. }
  153. func (dc *DeploymentController) deleteDeployment(obj interface{}) {
  154. d, ok := obj.(*apps.Deployment)
  155. if !ok {
  156. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  157. if !ok {
  158. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  159. return
  160. }
  161. d, ok = tombstone.Obj.(*apps.Deployment)
  162. if !ok {
  163. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Deployment %#v", obj))
  164. return
  165. }
  166. }
  167. klog.V(4).Infof("Deleting deployment %s", d.Name)
  168. dc.enqueueDeployment(d)
  169. }
  170. // addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
  171. func (dc *DeploymentController) addReplicaSet(obj interface{}) {
  172. rs := obj.(*apps.ReplicaSet)
  173. if rs.DeletionTimestamp != nil {
  174. // On a restart of the controller manager, it's possible for an object to
  175. // show up in a state that is already pending deletion.
  176. dc.deleteReplicaSet(rs)
  177. return
  178. }
  179. // If it has a ControllerRef, that's all that matters.
  180. if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
  181. d := dc.resolveControllerRef(rs.Namespace, controllerRef)
  182. if d == nil {
  183. return
  184. }
  185. klog.V(4).Infof("ReplicaSet %s added.", rs.Name)
  186. dc.enqueueDeployment(d)
  187. return
  188. }
  189. // Otherwise, it's an orphan. Get a list of all matching Deployments and sync
  190. // them to see if anyone wants to adopt it.
  191. ds := dc.getDeploymentsForReplicaSet(rs)
  192. if len(ds) == 0 {
  193. return
  194. }
  195. klog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)
  196. for _, d := range ds {
  197. dc.enqueueDeployment(d)
  198. }
  199. }
  200. // getDeploymentsForReplicaSet returns a list of Deployments that potentially
  201. // match a ReplicaSet.
  202. func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *apps.ReplicaSet) []*apps.Deployment {
  203. deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs)
  204. if err != nil || len(deployments) == 0 {
  205. return nil
  206. }
  207. // Because all ReplicaSet's belonging to a deployment should have a unique label key,
  208. // there should never be more than one deployment returned by the above method.
  209. // If that happens we should probably dynamically repair the situation by ultimately
  210. // trying to clean up one of the controllers, for now we just return the older one
  211. if len(deployments) > 1 {
  212. // ControllerRef will ensure we don't do anything crazy, but more than one
  213. // item in this list nevertheless constitutes user error.
  214. klog.V(4).Infof("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s",
  215. rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
  216. }
  217. return deployments
  218. }
  219. // updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
  220. // is updated and wake them up. If the anything of the ReplicaSets have changed, we need to
  221. // awaken both the old and new deployments. old and cur must be *apps.ReplicaSet
  222. // types.
  223. func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
  224. curRS := cur.(*apps.ReplicaSet)
  225. oldRS := old.(*apps.ReplicaSet)
  226. if curRS.ResourceVersion == oldRS.ResourceVersion {
  227. // Periodic resync will send update events for all known replica sets.
  228. // Two different versions of the same replica set will always have different RVs.
  229. return
  230. }
  231. curControllerRef := metav1.GetControllerOf(curRS)
  232. oldControllerRef := metav1.GetControllerOf(oldRS)
  233. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  234. if controllerRefChanged && oldControllerRef != nil {
  235. // The ControllerRef was changed. Sync the old controller, if any.
  236. if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
  237. dc.enqueueDeployment(d)
  238. }
  239. }
  240. // If it has a ControllerRef, that's all that matters.
  241. if curControllerRef != nil {
  242. d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
  243. if d == nil {
  244. return
  245. }
  246. klog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
  247. dc.enqueueDeployment(d)
  248. return
  249. }
  250. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  251. // to see if anyone wants to adopt it now.
  252. labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
  253. if labelChanged || controllerRefChanged {
  254. ds := dc.getDeploymentsForReplicaSet(curRS)
  255. if len(ds) == 0 {
  256. return
  257. }
  258. klog.V(4).Infof("Orphan ReplicaSet %s updated.", curRS.Name)
  259. for _, d := range ds {
  260. dc.enqueueDeployment(d)
  261. }
  262. }
  263. }
  264. // deleteReplicaSet enqueues the deployment that manages a ReplicaSet when
  265. // the ReplicaSet is deleted. obj could be an *apps.ReplicaSet, or
  266. // a DeletionFinalStateUnknown marker item.
  267. func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
  268. rs, ok := obj.(*apps.ReplicaSet)
  269. // When a delete is dropped, the relist will notice a pod in the store not
  270. // in the list, leading to the insertion of a tombstone object which contains
  271. // the deleted key/value. Note that this value might be stale. If the ReplicaSet
  272. // changed labels the new deployment will not be woken up till the periodic resync.
  273. if !ok {
  274. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  275. if !ok {
  276. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  277. return
  278. }
  279. rs, ok = tombstone.Obj.(*apps.ReplicaSet)
  280. if !ok {
  281. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v", obj))
  282. return
  283. }
  284. }
  285. controllerRef := metav1.GetControllerOf(rs)
  286. if controllerRef == nil {
  287. // No controller should care about orphans being deleted.
  288. return
  289. }
  290. d := dc.resolveControllerRef(rs.Namespace, controllerRef)
  291. if d == nil {
  292. return
  293. }
  294. klog.V(4).Infof("ReplicaSet %s deleted.", rs.Name)
  295. dc.enqueueDeployment(d)
  296. }
  297. // deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
  298. func (dc *DeploymentController) deletePod(obj interface{}) {
  299. pod, ok := obj.(*v1.Pod)
  300. // When a delete is dropped, the relist will notice a pod in the store not
  301. // in the list, leading to the insertion of a tombstone object which contains
  302. // the deleted key/value. Note that this value might be stale. If the Pod
  303. // changed labels the new deployment will not be woken up till the periodic resync.
  304. if !ok {
  305. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  306. if !ok {
  307. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  308. return
  309. }
  310. pod, ok = tombstone.Obj.(*v1.Pod)
  311. if !ok {
  312. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
  313. return
  314. }
  315. }
  316. klog.V(4).Infof("Pod %s deleted.", pod.Name)
  317. if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
  318. // Sync if this Deployment now has no more Pods.
  319. rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
  320. if err != nil {
  321. return
  322. }
  323. podMap, err := dc.getPodMapForDeployment(d, rsList)
  324. if err != nil {
  325. return
  326. }
  327. numPods := 0
  328. for _, podList := range podMap {
  329. numPods += len(podList)
  330. }
  331. if numPods == 0 {
  332. dc.enqueueDeployment(d)
  333. }
  334. }
  335. }
  336. func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
  337. key, err := controller.KeyFunc(deployment)
  338. if err != nil {
  339. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
  340. return
  341. }
  342. dc.queue.Add(key)
  343. }
  344. func (dc *DeploymentController) enqueueRateLimited(deployment *apps.Deployment) {
  345. key, err := controller.KeyFunc(deployment)
  346. if err != nil {
  347. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
  348. return
  349. }
  350. dc.queue.AddRateLimited(key)
  351. }
  352. // enqueueAfter will enqueue a deployment after the provided amount of time.
  353. func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after time.Duration) {
  354. key, err := controller.KeyFunc(deployment)
  355. if err != nil {
  356. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
  357. return
  358. }
  359. dc.queue.AddAfter(key, after)
  360. }
  361. // getDeploymentForPod returns the deployment managing the given Pod.
  362. func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *apps.Deployment {
  363. // Find the owning replica set
  364. var rs *apps.ReplicaSet
  365. var err error
  366. controllerRef := metav1.GetControllerOf(pod)
  367. if controllerRef == nil {
  368. // No controller owns this Pod.
  369. return nil
  370. }
  371. if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
  372. // Not a pod owned by a replica set.
  373. return nil
  374. }
  375. rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
  376. if err != nil || rs.UID != controllerRef.UID {
  377. klog.V(4).Infof("Cannot get replicaset %q for pod %q: %v", controllerRef.Name, pod.Name, err)
  378. return nil
  379. }
  380. // Now find the Deployment that owns that ReplicaSet.
  381. controllerRef = metav1.GetControllerOf(rs)
  382. if controllerRef == nil {
  383. return nil
  384. }
  385. return dc.resolveControllerRef(rs.Namespace, controllerRef)
  386. }
  387. // resolveControllerRef returns the controller referenced by a ControllerRef,
  388. // or nil if the ControllerRef could not be resolved to a matching controller
  389. // of the correct Kind.
  390. func (dc *DeploymentController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.Deployment {
  391. // We can't look up by UID, so look up by Name and then verify UID.
  392. // Don't even try to look up by Name if it's the wrong Kind.
  393. if controllerRef.Kind != controllerKind.Kind {
  394. return nil
  395. }
  396. d, err := dc.dLister.Deployments(namespace).Get(controllerRef.Name)
  397. if err != nil {
  398. return nil
  399. }
  400. if d.UID != controllerRef.UID {
  401. // The controller we found with this Name is not the same one that the
  402. // ControllerRef points to.
  403. return nil
  404. }
  405. return d
  406. }
  407. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  408. // It enforces that the syncHandler is never invoked concurrently with the same key.
  409. func (dc *DeploymentController) worker() {
  410. for dc.processNextWorkItem() {
  411. }
  412. }
  413. func (dc *DeploymentController) processNextWorkItem() bool {
  414. key, quit := dc.queue.Get()
  415. if quit {
  416. return false
  417. }
  418. defer dc.queue.Done(key)
  419. err := dc.syncHandler(key.(string))
  420. dc.handleErr(err, key)
  421. return true
  422. }
  423. func (dc *DeploymentController) handleErr(err error, key interface{}) {
  424. if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
  425. dc.queue.Forget(key)
  426. return
  427. }
  428. if dc.queue.NumRequeues(key) < maxRetries {
  429. klog.V(2).Infof("Error syncing deployment %v: %v", key, err)
  430. dc.queue.AddRateLimited(key)
  431. return
  432. }
  433. utilruntime.HandleError(err)
  434. klog.V(2).Infof("Dropping deployment %q out of the queue: %v", key, err)
  435. dc.queue.Forget(key)
  436. }
  437. // getReplicaSetsForDeployment uses ControllerRefManager to reconcile
  438. // ControllerRef by adopting and orphaning.
  439. // It returns the list of ReplicaSets that this Deployment should manage.
  440. func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment) ([]*apps.ReplicaSet, error) {
  441. // List all ReplicaSets to find those we own but that no longer match our
  442. // selector. They will be orphaned by ClaimReplicaSets().
  443. rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
  444. if err != nil {
  445. return nil, err
  446. }
  447. deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
  448. if err != nil {
  449. return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
  450. }
  451. // If any adoptions are attempted, we should first recheck for deletion with
  452. // an uncached quorum read sometime after listing ReplicaSets (see #42639).
  453. canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  454. fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(context.TODO(), d.Name, metav1.GetOptions{})
  455. if err != nil {
  456. return nil, err
  457. }
  458. if fresh.UID != d.UID {
  459. return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID)
  460. }
  461. return fresh, nil
  462. })
  463. cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)
  464. return cm.ClaimReplicaSets(rsList)
  465. }
  466. // getPodMapForDeployment returns the Pods managed by a Deployment.
  467. //
  468. // It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,
  469. // according to the Pod's ControllerRef.
  470. // NOTE: The pod pointers returned by this method point the pod objects in the cache and thus
  471. // shouldn't be modified in any way.
  472. func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID][]*v1.Pod, error) {
  473. // Get all Pods that potentially belong to this Deployment.
  474. selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
  475. if err != nil {
  476. return nil, err
  477. }
  478. pods, err := dc.podLister.Pods(d.Namespace).List(selector)
  479. if err != nil {
  480. return nil, err
  481. }
  482. // Group Pods by their controller (if it's in rsList).
  483. podMap := make(map[types.UID][]*v1.Pod, len(rsList))
  484. for _, rs := range rsList {
  485. podMap[rs.UID] = []*v1.Pod{}
  486. }
  487. for _, pod := range pods {
  488. // Do not ignore inactive Pods because Recreate Deployments need to verify that no
  489. // Pods from older versions are running before spinning up new Pods.
  490. controllerRef := metav1.GetControllerOf(pod)
  491. if controllerRef == nil {
  492. continue
  493. }
  494. // Only append if we care about this UID.
  495. if _, ok := podMap[controllerRef.UID]; ok {
  496. podMap[controllerRef.UID] = append(podMap[controllerRef.UID], pod)
  497. }
  498. }
  499. return podMap, nil
  500. }
  501. // syncDeployment will sync the deployment with the given key.
  502. // This function is not meant to be invoked concurrently with the same key.
  503. func (dc *DeploymentController) syncDeployment(key string) error {
  504. startTime := time.Now()
  505. klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
  506. defer func() {
  507. klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
  508. }()
  509. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  510. if err != nil {
  511. return err
  512. }
  513. deployment, err := dc.dLister.Deployments(namespace).Get(name)
  514. if errors.IsNotFound(err) {
  515. klog.V(2).Infof("Deployment %v has been deleted", key)
  516. return nil
  517. }
  518. if err != nil {
  519. return err
  520. }
  521. // Deep-copy otherwise we are mutating our cache.
  522. // TODO: Deep-copy only when needed.
  523. d := deployment.DeepCopy()
  524. everything := metav1.LabelSelector{}
  525. if reflect.DeepEqual(d.Spec.Selector, &everything) {
  526. dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
  527. if d.Status.ObservedGeneration < d.Generation {
  528. d.Status.ObservedGeneration = d.Generation
  529. dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
  530. }
  531. return nil
  532. }
  533. // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
  534. // through adoption/orphaning.
  535. rsList, err := dc.getReplicaSetsForDeployment(d)
  536. if err != nil {
  537. return err
  538. }
  539. // List all Pods owned by this Deployment, grouped by their ReplicaSet.
  540. // Current uses of the podMap are:
  541. //
  542. // * check if a Pod is labeled correctly with the pod-template-hash label.
  543. // * check that no old Pods are running in the middle of Recreate Deployments.
  544. podMap, err := dc.getPodMapForDeployment(d, rsList)
  545. if err != nil {
  546. return err
  547. }
  548. if d.DeletionTimestamp != nil {
  549. return dc.syncStatusOnly(d, rsList)
  550. }
  551. // Update deployment conditions with an Unknown condition when pausing/resuming
  552. // a deployment. In this way, we can be sure that we won't timeout when a user
  553. // resumes a Deployment with a set progressDeadlineSeconds.
  554. if err = dc.checkPausedConditions(d); err != nil {
  555. return err
  556. }
  557. if d.Spec.Paused {
  558. return dc.sync(d, rsList)
  559. }
  560. // rollback is not re-entrant in case the underlying replica sets are updated with a new
  561. // revision so we should ensure that we won't proceed to update replica sets until we
  562. // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
  563. if getRollbackTo(d) != nil {
  564. return dc.rollback(d, rsList)
  565. }
  566. scalingEvent, err := dc.isScalingEvent(d, rsList)
  567. if err != nil {
  568. return err
  569. }
  570. if scalingEvent {
  571. return dc.sync(d, rsList)
  572. }
  573. switch d.Spec.Strategy.Type {
  574. case apps.RecreateDeploymentStrategyType:
  575. return dc.rolloutRecreate(d, rsList, podMap)
  576. case apps.RollingUpdateDeploymentStrategyType:
  577. return dc.rolloutRolling(d, rsList)
  578. }
  579. return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
  580. }