deployment_controller.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package deployment contains all the logic for handling Kubernetes Deployments.
  14. // It implements a set of strategies (rolling, recreate) for deploying an application,
  15. // the means to rollback to previous versions, proportional scaling for mitigating
  16. // risk, cleanup policy, and other useful features of Deployments.
  17. package deployment
  18. import (
  19. "fmt"
  20. "reflect"
  21. "time"
  22. "k8s.io/klog"
  23. apps "k8s.io/api/apps/v1"
  24. "k8s.io/api/core/v1"
  25. "k8s.io/apimachinery/pkg/api/errors"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/labels"
  28. "k8s.io/apimachinery/pkg/types"
  29. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. appsinformers "k8s.io/client-go/informers/apps/v1"
  32. coreinformers "k8s.io/client-go/informers/core/v1"
  33. clientset "k8s.io/client-go/kubernetes"
  34. "k8s.io/client-go/kubernetes/scheme"
  35. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  36. appslisters "k8s.io/client-go/listers/apps/v1"
  37. corelisters "k8s.io/client-go/listers/core/v1"
  38. "k8s.io/client-go/tools/cache"
  39. "k8s.io/client-go/tools/record"
  40. "k8s.io/client-go/util/workqueue"
  41. "k8s.io/kubernetes/pkg/controller"
  42. "k8s.io/kubernetes/pkg/controller/deployment/util"
  43. "k8s.io/kubernetes/pkg/util/metrics"
  44. )
  45. const (
  46. // maxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
  47. // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
  48. // a deployment is going to be requeued:
  49. //
  50. // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
  51. maxRetries = 15
  52. )
  53. // controllerKind contains the schema.GroupVersionKind for this controller type.
  54. var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
  55. // DeploymentController is responsible for synchronizing Deployment objects stored
  56. // in the system with actual running replica sets and pods.
  57. type DeploymentController struct {
  58. // rsControl is used for adopting/releasing replica sets.
  59. rsControl controller.RSControlInterface
  60. client clientset.Interface
  61. eventRecorder record.EventRecorder
  62. // To allow injection of syncDeployment for testing.
  63. syncHandler func(dKey string) error
  64. // used for unit testing
  65. enqueueDeployment func(deployment *apps.Deployment)
  66. // dLister can list/get deployments from the shared informer's store
  67. dLister appslisters.DeploymentLister
  68. // rsLister can list/get replica sets from the shared informer's store
  69. rsLister appslisters.ReplicaSetLister
  70. // podLister can list/get pods from the shared informer's store
  71. podLister corelisters.PodLister
  72. // dListerSynced returns true if the Deployment store has been synced at least once.
  73. // Added as a member to the struct to allow injection for testing.
  74. dListerSynced cache.InformerSynced
  75. // rsListerSynced returns true if the ReplicaSet store has been synced at least once.
  76. // Added as a member to the struct to allow injection for testing.
  77. rsListerSynced cache.InformerSynced
  78. // podListerSynced returns true if the pod store has been synced at least once.
  79. // Added as a member to the struct to allow injection for testing.
  80. podListerSynced cache.InformerSynced
  81. // Deployments that need to be synced
  82. queue workqueue.RateLimitingInterface
  83. }
  84. // NewDeploymentController creates a new DeploymentController.
  85. func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
  86. eventBroadcaster := record.NewBroadcaster()
  87. eventBroadcaster.StartLogging(klog.Infof)
  88. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
  89. if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
  90. if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  91. return nil, err
  92. }
  93. }
  94. dc := &DeploymentController{
  95. client: client,
  96. eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
  97. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
  98. }
  99. dc.rsControl = controller.RealRSControl{
  100. KubeClient: client,
  101. Recorder: dc.eventRecorder,
  102. }
  103. dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  104. AddFunc: dc.addDeployment,
  105. UpdateFunc: dc.updateDeployment,
  106. // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
  107. DeleteFunc: dc.deleteDeployment,
  108. })
  109. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  110. AddFunc: dc.addReplicaSet,
  111. UpdateFunc: dc.updateReplicaSet,
  112. DeleteFunc: dc.deleteReplicaSet,
  113. })
  114. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  115. DeleteFunc: dc.deletePod,
  116. })
  117. dc.syncHandler = dc.syncDeployment
  118. dc.enqueueDeployment = dc.enqueue
  119. dc.dLister = dInformer.Lister()
  120. dc.rsLister = rsInformer.Lister()
  121. dc.podLister = podInformer.Lister()
  122. dc.dListerSynced = dInformer.Informer().HasSynced
  123. dc.rsListerSynced = rsInformer.Informer().HasSynced
  124. dc.podListerSynced = podInformer.Informer().HasSynced
  125. return dc, nil
  126. }
  127. // Run begins watching and syncing.
  128. func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
  129. defer utilruntime.HandleCrash()
  130. defer dc.queue.ShutDown()
  131. klog.Infof("Starting deployment controller")
  132. defer klog.Infof("Shutting down deployment controller")
  133. if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
  134. return
  135. }
  136. for i := 0; i < workers; i++ {
  137. go wait.Until(dc.worker, time.Second, stopCh)
  138. }
  139. <-stopCh
  140. }
  141. func (dc *DeploymentController) addDeployment(obj interface{}) {
  142. d := obj.(*apps.Deployment)
  143. klog.V(4).Infof("Adding deployment %s", d.Name)
  144. dc.enqueueDeployment(d)
  145. }
  146. func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
  147. oldD := old.(*apps.Deployment)
  148. curD := cur.(*apps.Deployment)
  149. klog.V(4).Infof("Updating deployment %s", oldD.Name)
  150. dc.enqueueDeployment(curD)
  151. }
  152. func (dc *DeploymentController) deleteDeployment(obj interface{}) {
  153. d, ok := obj.(*apps.Deployment)
  154. if !ok {
  155. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  156. if !ok {
  157. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  158. return
  159. }
  160. d, ok = tombstone.Obj.(*apps.Deployment)
  161. if !ok {
  162. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Deployment %#v", obj))
  163. return
  164. }
  165. }
  166. klog.V(4).Infof("Deleting deployment %s", d.Name)
  167. dc.enqueueDeployment(d)
  168. }
  169. // addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
  170. func (dc *DeploymentController) addReplicaSet(obj interface{}) {
  171. rs := obj.(*apps.ReplicaSet)
  172. if rs.DeletionTimestamp != nil {
  173. // On a restart of the controller manager, it's possible for an object to
  174. // show up in a state that is already pending deletion.
  175. dc.deleteReplicaSet(rs)
  176. return
  177. }
  178. // If it has a ControllerRef, that's all that matters.
  179. if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
  180. d := dc.resolveControllerRef(rs.Namespace, controllerRef)
  181. if d == nil {
  182. return
  183. }
  184. klog.V(4).Infof("ReplicaSet %s added.", rs.Name)
  185. dc.enqueueDeployment(d)
  186. return
  187. }
  188. // Otherwise, it's an orphan. Get a list of all matching Deployments and sync
  189. // them to see if anyone wants to adopt it.
  190. ds := dc.getDeploymentsForReplicaSet(rs)
  191. if len(ds) == 0 {
  192. return
  193. }
  194. klog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)
  195. for _, d := range ds {
  196. dc.enqueueDeployment(d)
  197. }
  198. }
  199. // getDeploymentsForReplicaSet returns a list of Deployments that potentially
  200. // match a ReplicaSet.
  201. func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *apps.ReplicaSet) []*apps.Deployment {
  202. deployments, err := dc.dLister.GetDeploymentsForReplicaSet(rs)
  203. if err != nil || len(deployments) == 0 {
  204. return nil
  205. }
  206. // Because all ReplicaSet's belonging to a deployment should have a unique label key,
  207. // there should never be more than one deployment returned by the above method.
  208. // If that happens we should probably dynamically repair the situation by ultimately
  209. // trying to clean up one of the controllers, for now we just return the older one
  210. if len(deployments) > 1 {
  211. // ControllerRef will ensure we don't do anything crazy, but more than one
  212. // item in this list nevertheless constitutes user error.
  213. klog.V(4).Infof("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s",
  214. rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
  215. }
  216. return deployments
  217. }
  218. // updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
  219. // is updated and wake them up. If the anything of the ReplicaSets have changed, we need to
  220. // awaken both the old and new deployments. old and cur must be *apps.ReplicaSet
  221. // types.
  222. func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
  223. curRS := cur.(*apps.ReplicaSet)
  224. oldRS := old.(*apps.ReplicaSet)
  225. if curRS.ResourceVersion == oldRS.ResourceVersion {
  226. // Periodic resync will send update events for all known replica sets.
  227. // Two different versions of the same replica set will always have different RVs.
  228. return
  229. }
  230. curControllerRef := metav1.GetControllerOf(curRS)
  231. oldControllerRef := metav1.GetControllerOf(oldRS)
  232. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  233. if controllerRefChanged && oldControllerRef != nil {
  234. // The ControllerRef was changed. Sync the old controller, if any.
  235. if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
  236. dc.enqueueDeployment(d)
  237. }
  238. }
  239. // If it has a ControllerRef, that's all that matters.
  240. if curControllerRef != nil {
  241. d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
  242. if d == nil {
  243. return
  244. }
  245. klog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
  246. dc.enqueueDeployment(d)
  247. return
  248. }
  249. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  250. // to see if anyone wants to adopt it now.
  251. labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
  252. if labelChanged || controllerRefChanged {
  253. ds := dc.getDeploymentsForReplicaSet(curRS)
  254. if len(ds) == 0 {
  255. return
  256. }
  257. klog.V(4).Infof("Orphan ReplicaSet %s updated.", curRS.Name)
  258. for _, d := range ds {
  259. dc.enqueueDeployment(d)
  260. }
  261. }
  262. }
  263. // deleteReplicaSet enqueues the deployment that manages a ReplicaSet when
  264. // the ReplicaSet is deleted. obj could be an *apps.ReplicaSet, or
  265. // a DeletionFinalStateUnknown marker item.
  266. func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
  267. rs, ok := obj.(*apps.ReplicaSet)
  268. // When a delete is dropped, the relist will notice a pod in the store not
  269. // in the list, leading to the insertion of a tombstone object which contains
  270. // the deleted key/value. Note that this value might be stale. If the ReplicaSet
  271. // changed labels the new deployment will not be woken up till the periodic resync.
  272. if !ok {
  273. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  274. if !ok {
  275. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  276. return
  277. }
  278. rs, ok = tombstone.Obj.(*apps.ReplicaSet)
  279. if !ok {
  280. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v", obj))
  281. return
  282. }
  283. }
  284. controllerRef := metav1.GetControllerOf(rs)
  285. if controllerRef == nil {
  286. // No controller should care about orphans being deleted.
  287. return
  288. }
  289. d := dc.resolveControllerRef(rs.Namespace, controllerRef)
  290. if d == nil {
  291. return
  292. }
  293. klog.V(4).Infof("ReplicaSet %s deleted.", rs.Name)
  294. dc.enqueueDeployment(d)
  295. }
  296. // deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
  297. func (dc *DeploymentController) deletePod(obj interface{}) {
  298. pod, ok := obj.(*v1.Pod)
  299. // When a delete is dropped, the relist will notice a pod in the store not
  300. // in the list, leading to the insertion of a tombstone object which contains
  301. // the deleted key/value. Note that this value might be stale. If the Pod
  302. // changed labels the new deployment will not be woken up till the periodic resync.
  303. if !ok {
  304. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  305. if !ok {
  306. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  307. return
  308. }
  309. pod, ok = tombstone.Obj.(*v1.Pod)
  310. if !ok {
  311. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
  312. return
  313. }
  314. }
  315. klog.V(4).Infof("Pod %s deleted.", pod.Name)
  316. if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
  317. // Sync if this Deployment now has no more Pods.
  318. rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
  319. if err != nil {
  320. return
  321. }
  322. podMap, err := dc.getPodMapForDeployment(d, rsList)
  323. if err != nil {
  324. return
  325. }
  326. numPods := 0
  327. for _, podList := range podMap {
  328. numPods += len(podList.Items)
  329. }
  330. if numPods == 0 {
  331. dc.enqueueDeployment(d)
  332. }
  333. }
  334. }
  335. func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
  336. key, err := controller.KeyFunc(deployment)
  337. if err != nil {
  338. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
  339. return
  340. }
  341. dc.queue.Add(key)
  342. }
  343. func (dc *DeploymentController) enqueueRateLimited(deployment *apps.Deployment) {
  344. key, err := controller.KeyFunc(deployment)
  345. if err != nil {
  346. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
  347. return
  348. }
  349. dc.queue.AddRateLimited(key)
  350. }
  351. // enqueueAfter will enqueue a deployment after the provided amount of time.
  352. func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after time.Duration) {
  353. key, err := controller.KeyFunc(deployment)
  354. if err != nil {
  355. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
  356. return
  357. }
  358. dc.queue.AddAfter(key, after)
  359. }
  360. // getDeploymentForPod returns the deployment managing the given Pod.
  361. func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *apps.Deployment {
  362. // Find the owning replica set
  363. var rs *apps.ReplicaSet
  364. var err error
  365. controllerRef := metav1.GetControllerOf(pod)
  366. if controllerRef == nil {
  367. // No controller owns this Pod.
  368. return nil
  369. }
  370. if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
  371. // Not a pod owned by a replica set.
  372. return nil
  373. }
  374. rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
  375. if err != nil || rs.UID != controllerRef.UID {
  376. klog.V(4).Infof("Cannot get replicaset %q for pod %q: %v", controllerRef.Name, pod.Name, err)
  377. return nil
  378. }
  379. // Now find the Deployment that owns that ReplicaSet.
  380. controllerRef = metav1.GetControllerOf(rs)
  381. if controllerRef == nil {
  382. return nil
  383. }
  384. return dc.resolveControllerRef(rs.Namespace, controllerRef)
  385. }
  386. // resolveControllerRef returns the controller referenced by a ControllerRef,
  387. // or nil if the ControllerRef could not be resolved to a matching controller
  388. // of the correct Kind.
  389. func (dc *DeploymentController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.Deployment {
  390. // We can't look up by UID, so look up by Name and then verify UID.
  391. // Don't even try to look up by Name if it's the wrong Kind.
  392. if controllerRef.Kind != controllerKind.Kind {
  393. return nil
  394. }
  395. d, err := dc.dLister.Deployments(namespace).Get(controllerRef.Name)
  396. if err != nil {
  397. return nil
  398. }
  399. if d.UID != controllerRef.UID {
  400. // The controller we found with this Name is not the same one that the
  401. // ControllerRef points to.
  402. return nil
  403. }
  404. return d
  405. }
  406. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  407. // It enforces that the syncHandler is never invoked concurrently with the same key.
  408. func (dc *DeploymentController) worker() {
  409. for dc.processNextWorkItem() {
  410. }
  411. }
  412. func (dc *DeploymentController) processNextWorkItem() bool {
  413. key, quit := dc.queue.Get()
  414. if quit {
  415. return false
  416. }
  417. defer dc.queue.Done(key)
  418. err := dc.syncHandler(key.(string))
  419. dc.handleErr(err, key)
  420. return true
  421. }
  422. func (dc *DeploymentController) handleErr(err error, key interface{}) {
  423. if err == nil {
  424. dc.queue.Forget(key)
  425. return
  426. }
  427. if dc.queue.NumRequeues(key) < maxRetries {
  428. klog.V(2).Infof("Error syncing deployment %v: %v", key, err)
  429. dc.queue.AddRateLimited(key)
  430. return
  431. }
  432. utilruntime.HandleError(err)
  433. klog.V(2).Infof("Dropping deployment %q out of the queue: %v", key, err)
  434. dc.queue.Forget(key)
  435. }
  436. // getReplicaSetsForDeployment uses ControllerRefManager to reconcile
  437. // ControllerRef by adopting and orphaning.
  438. // It returns the list of ReplicaSets that this Deployment should manage.
  439. func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment) ([]*apps.ReplicaSet, error) {
  440. // List all ReplicaSets to find those we own but that no longer match our
  441. // selector. They will be orphaned by ClaimReplicaSets().
  442. rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
  443. if err != nil {
  444. return nil, err
  445. }
  446. deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
  447. if err != nil {
  448. return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
  449. }
  450. // If any adoptions are attempted, we should first recheck for deletion with
  451. // an uncached quorum read sometime after listing ReplicaSets (see #42639).
  452. canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  453. fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(d.Name, metav1.GetOptions{})
  454. if err != nil {
  455. return nil, err
  456. }
  457. if fresh.UID != d.UID {
  458. return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID)
  459. }
  460. return fresh, nil
  461. })
  462. cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)
  463. return cm.ClaimReplicaSets(rsList)
  464. }
  465. // getPodMapForDeployment returns the Pods managed by a Deployment.
  466. //
  467. // It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,
  468. // according to the Pod's ControllerRef.
  469. func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID]*v1.PodList, error) {
  470. // Get all Pods that potentially belong to this Deployment.
  471. selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
  472. if err != nil {
  473. return nil, err
  474. }
  475. pods, err := dc.podLister.Pods(d.Namespace).List(selector)
  476. if err != nil {
  477. return nil, err
  478. }
  479. // Group Pods by their controller (if it's in rsList).
  480. podMap := make(map[types.UID]*v1.PodList, len(rsList))
  481. for _, rs := range rsList {
  482. podMap[rs.UID] = &v1.PodList{}
  483. }
  484. for _, pod := range pods {
  485. // Do not ignore inactive Pods because Recreate Deployments need to verify that no
  486. // Pods from older versions are running before spinning up new Pods.
  487. controllerRef := metav1.GetControllerOf(pod)
  488. if controllerRef == nil {
  489. continue
  490. }
  491. // Only append if we care about this UID.
  492. if podList, ok := podMap[controllerRef.UID]; ok {
  493. podList.Items = append(podList.Items, *pod)
  494. }
  495. }
  496. return podMap, nil
  497. }
  498. // syncDeployment will sync the deployment with the given key.
  499. // This function is not meant to be invoked concurrently with the same key.
  500. func (dc *DeploymentController) syncDeployment(key string) error {
  501. startTime := time.Now()
  502. klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
  503. defer func() {
  504. klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
  505. }()
  506. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  507. if err != nil {
  508. return err
  509. }
  510. deployment, err := dc.dLister.Deployments(namespace).Get(name)
  511. if errors.IsNotFound(err) {
  512. klog.V(2).Infof("Deployment %v has been deleted", key)
  513. return nil
  514. }
  515. if err != nil {
  516. return err
  517. }
  518. // Deep-copy otherwise we are mutating our cache.
  519. // TODO: Deep-copy only when needed.
  520. d := deployment.DeepCopy()
  521. everything := metav1.LabelSelector{}
  522. if reflect.DeepEqual(d.Spec.Selector, &everything) {
  523. dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
  524. if d.Status.ObservedGeneration < d.Generation {
  525. d.Status.ObservedGeneration = d.Generation
  526. dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)
  527. }
  528. return nil
  529. }
  530. // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
  531. // through adoption/orphaning.
  532. rsList, err := dc.getReplicaSetsForDeployment(d)
  533. if err != nil {
  534. return err
  535. }
  536. // List all Pods owned by this Deployment, grouped by their ReplicaSet.
  537. // Current uses of the podMap are:
  538. //
  539. // * check if a Pod is labeled correctly with the pod-template-hash label.
  540. // * check that no old Pods are running in the middle of Recreate Deployments.
  541. podMap, err := dc.getPodMapForDeployment(d, rsList)
  542. if err != nil {
  543. return err
  544. }
  545. if d.DeletionTimestamp != nil {
  546. return dc.syncStatusOnly(d, rsList)
  547. }
  548. // Update deployment conditions with an Unknown condition when pausing/resuming
  549. // a deployment. In this way, we can be sure that we won't timeout when a user
  550. // resumes a Deployment with a set progressDeadlineSeconds.
  551. if err = dc.checkPausedConditions(d); err != nil {
  552. return err
  553. }
  554. if d.Spec.Paused {
  555. return dc.sync(d, rsList)
  556. }
  557. // rollback is not re-entrant in case the underlying replica sets are updated with a new
  558. // revision so we should ensure that we won't proceed to update replica sets until we
  559. // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
  560. if getRollbackTo(d) != nil {
  561. return dc.rollback(d, rsList)
  562. }
  563. scalingEvent, err := dc.isScalingEvent(d, rsList)
  564. if err != nil {
  565. return err
  566. }
  567. if scalingEvent {
  568. return dc.sync(d, rsList)
  569. }
  570. switch d.Spec.Strategy.Type {
  571. case apps.RecreateDeploymentStrategyType:
  572. return dc.rolloutRecreate(d, rsList, podMap)
  573. case apps.RollingUpdateDeploymentStrategyType:
  574. return dc.rolloutRolling(d, rsList)
  575. }
  576. return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
  577. }