replica_set.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // ### ATTENTION ###
  14. //
  15. // This code implements both ReplicaSet and ReplicationController.
  16. //
  17. // For RC, the objects are converted on the way in and out (see ../replication/),
  18. // as if ReplicationController were just an older API version of ReplicaSet.
  19. // However, RC and RS still have separate storage and separate instantiations
  20. // of the ReplicaSetController object.
  21. //
  22. // Use rsc.Kind in log messages rather than hard-coding "ReplicaSet".
  23. package replicaset
  24. import (
  25. "context"
  26. "fmt"
  27. "reflect"
  28. "sort"
  29. "strings"
  30. "sync"
  31. "time"
  32. apps "k8s.io/api/apps/v1"
  33. "k8s.io/api/core/v1"
  34. "k8s.io/apimachinery/pkg/api/errors"
  35. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  36. "k8s.io/apimachinery/pkg/labels"
  37. "k8s.io/apimachinery/pkg/runtime/schema"
  38. "k8s.io/apimachinery/pkg/types"
  39. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  40. "k8s.io/apimachinery/pkg/util/wait"
  41. appsinformers "k8s.io/client-go/informers/apps/v1"
  42. coreinformers "k8s.io/client-go/informers/core/v1"
  43. clientset "k8s.io/client-go/kubernetes"
  44. "k8s.io/client-go/kubernetes/scheme"
  45. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  46. appslisters "k8s.io/client-go/listers/apps/v1"
  47. corelisters "k8s.io/client-go/listers/core/v1"
  48. "k8s.io/client-go/tools/cache"
  49. "k8s.io/client-go/tools/record"
  50. "k8s.io/client-go/util/workqueue"
  51. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  52. "k8s.io/klog"
  53. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  54. "k8s.io/kubernetes/pkg/controller"
  55. "k8s.io/utils/integer"
  56. )
  57. const (
  58. // Realistic value of the burstReplica field for the replica set manager based off
  59. // performance requirements for kubernetes 1.0.
  60. BurstReplicas = 500
  61. // The number of times we retry updating a ReplicaSet's status.
  62. statusUpdateRetries = 1
  63. )
  64. // ReplicaSetController is responsible for synchronizing ReplicaSet objects stored
  65. // in the system with actual running pods.
  66. type ReplicaSetController struct {
  67. // GroupVersionKind indicates the controller type.
  68. // Different instances of this struct may handle different GVKs.
  69. // For example, this struct can be used (with adapters) to handle ReplicationController.
  70. schema.GroupVersionKind
  71. kubeClient clientset.Interface
  72. podControl controller.PodControlInterface
  73. // A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
  74. // It resumes normal action after observing the watch events for them.
  75. burstReplicas int
  76. // To allow injection of syncReplicaSet for testing.
  77. syncHandler func(rsKey string) error
  78. // A TTLCache of pod creates/deletes each rc expects to see.
  79. expectations *controller.UIDTrackingControllerExpectations
  80. // A store of ReplicaSets, populated by the shared informer passed to NewReplicaSetController
  81. rsLister appslisters.ReplicaSetLister
  82. // rsListerSynced returns true if the pod store has been synced at least once.
  83. // Added as a member to the struct to allow injection for testing.
  84. rsListerSynced cache.InformerSynced
  85. // A store of pods, populated by the shared informer passed to NewReplicaSetController
  86. podLister corelisters.PodLister
  87. // podListerSynced returns true if the pod store has been synced at least once.
  88. // Added as a member to the struct to allow injection for testing.
  89. podListerSynced cache.InformerSynced
  90. // Controllers that need to be synced
  91. queue workqueue.RateLimitingInterface
  92. }
  93. // NewReplicaSetController configures a replica set controller with the specified event recorder
  94. func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
  95. eventBroadcaster := record.NewBroadcaster()
  96. eventBroadcaster.StartLogging(klog.Infof)
  97. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  98. return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
  99. apps.SchemeGroupVersion.WithKind("ReplicaSet"),
  100. "replicaset_controller",
  101. "replicaset",
  102. controller.RealPodControl{
  103. KubeClient: kubeClient,
  104. Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
  105. },
  106. )
  107. }
  108. // NewBaseController is the implementation of NewReplicaSetController with additional injected
  109. // parameters so that it can also serve as the implementation of NewReplicationController.
  110. func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
  111. gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
  112. if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  113. ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
  114. }
  115. rsc := &ReplicaSetController{
  116. GroupVersionKind: gvk,
  117. kubeClient: kubeClient,
  118. podControl: podControl,
  119. burstReplicas: burstReplicas,
  120. expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
  121. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
  122. }
  123. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  124. AddFunc: rsc.addRS,
  125. UpdateFunc: rsc.updateRS,
  126. DeleteFunc: rsc.deleteRS,
  127. })
  128. rsc.rsLister = rsInformer.Lister()
  129. rsc.rsListerSynced = rsInformer.Informer().HasSynced
  130. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  131. AddFunc: rsc.addPod,
  132. // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
  133. // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
  134. // local storage, so it should be ok.
  135. UpdateFunc: rsc.updatePod,
  136. DeleteFunc: rsc.deletePod,
  137. })
  138. rsc.podLister = podInformer.Lister()
  139. rsc.podListerSynced = podInformer.Informer().HasSynced
  140. rsc.syncHandler = rsc.syncReplicaSet
  141. return rsc
  142. }
  143. // SetEventRecorder replaces the event recorder used by the ReplicaSetController
  144. // with the given recorder. Only used for testing.
  145. func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) {
  146. // TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
  147. // need to pass in a fake.
  148. rsc.podControl = controller.RealPodControl{KubeClient: rsc.kubeClient, Recorder: recorder}
  149. }
  150. // Run begins watching and syncing.
  151. func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
  152. defer utilruntime.HandleCrash()
  153. defer rsc.queue.ShutDown()
  154. controllerName := strings.ToLower(rsc.Kind)
  155. klog.Infof("Starting %v controller", controllerName)
  156. defer klog.Infof("Shutting down %v controller", controllerName)
  157. if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
  158. return
  159. }
  160. for i := 0; i < workers; i++ {
  161. go wait.Until(rsc.worker, time.Second, stopCh)
  162. }
  163. <-stopCh
  164. }
  165. // getReplicaSetsWithSameController returns a list of ReplicaSets with the same
  166. // owner as the given ReplicaSet.
  167. func (rsc *ReplicaSetController) getReplicaSetsWithSameController(rs *apps.ReplicaSet) []*apps.ReplicaSet {
  168. controllerRef := metav1.GetControllerOf(rs)
  169. if controllerRef == nil {
  170. utilruntime.HandleError(fmt.Errorf("ReplicaSet has no controller: %v", rs))
  171. return nil
  172. }
  173. allRSs, err := rsc.rsLister.ReplicaSets(rs.Namespace).List(labels.Everything())
  174. if err != nil {
  175. utilruntime.HandleError(err)
  176. return nil
  177. }
  178. var relatedRSs []*apps.ReplicaSet
  179. for _, r := range allRSs {
  180. if ref := metav1.GetControllerOf(r); ref != nil && ref.UID == controllerRef.UID {
  181. relatedRSs = append(relatedRSs, r)
  182. }
  183. }
  184. if klog.V(2) {
  185. var related string
  186. if len(relatedRSs) > 0 {
  187. var relatedNames []string
  188. for _, r := range relatedRSs {
  189. relatedNames = append(relatedNames, r.Name)
  190. }
  191. related = ": " + strings.Join(relatedNames, ", ")
  192. }
  193. klog.Infof("Found %d related %vs for %v %s/%s%s", len(relatedRSs), rsc.Kind, rsc.Kind, rs.Namespace, rs.Name, related)
  194. }
  195. return relatedRSs
  196. }
  197. // getPodReplicaSets returns a list of ReplicaSets matching the given pod.
  198. func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*apps.ReplicaSet {
  199. rss, err := rsc.rsLister.GetPodReplicaSets(pod)
  200. if err != nil {
  201. return nil
  202. }
  203. if len(rss) > 1 {
  204. // ControllerRef will ensure we don't do anything crazy, but more than one
  205. // item in this list nevertheless constitutes user error.
  206. utilruntime.HandleError(fmt.Errorf("user error! more than one %v is selecting pods with labels: %+v", rsc.Kind, pod.Labels))
  207. }
  208. return rss
  209. }
  210. // resolveControllerRef returns the controller referenced by a ControllerRef,
  211. // or nil if the ControllerRef could not be resolved to a matching controller
  212. // of the correct Kind.
  213. func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.ReplicaSet {
  214. // We can't look up by UID, so look up by Name and then verify UID.
  215. // Don't even try to look up by Name if it's the wrong Kind.
  216. if controllerRef.Kind != rsc.Kind {
  217. return nil
  218. }
  219. rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
  220. if err != nil {
  221. return nil
  222. }
  223. if rs.UID != controllerRef.UID {
  224. // The controller we found with this Name is not the same one that the
  225. // ControllerRef points to.
  226. return nil
  227. }
  228. return rs
  229. }
  230. func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {
  231. key, err := controller.KeyFunc(rs)
  232. if err != nil {
  233. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
  234. return
  235. }
  236. rsc.queue.Add(key)
  237. }
  238. func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration time.Duration) {
  239. key, err := controller.KeyFunc(rs)
  240. if err != nil {
  241. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
  242. return
  243. }
  244. rsc.queue.AddAfter(key, duration)
  245. }
  246. func (rsc *ReplicaSetController) addRS(obj interface{}) {
  247. rs := obj.(*apps.ReplicaSet)
  248. klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)
  249. rsc.enqueueRS(rs)
  250. }
  251. // callback when RS is updated
  252. func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
  253. oldRS := old.(*apps.ReplicaSet)
  254. curRS := cur.(*apps.ReplicaSet)
  255. // TODO: make a KEP and fix informers to always call the delete event handler on re-create
  256. if curRS.UID != oldRS.UID {
  257. key, err := controller.KeyFunc(oldRS)
  258. if err != nil {
  259. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
  260. return
  261. }
  262. rsc.deleteRS(cache.DeletedFinalStateUnknown{
  263. Key: key,
  264. Obj: oldRS,
  265. })
  266. }
  267. // You might imagine that we only really need to enqueue the
  268. // replica set when Spec changes, but it is safer to sync any
  269. // time this function is triggered. That way a full informer
  270. // resync can requeue any replica set that don't yet have pods
  271. // but whose last attempts at creating a pod have failed (since
  272. // we don't block on creation of pods) instead of those
  273. // replica sets stalling indefinitely. Enqueueing every time
  274. // does result in some spurious syncs (like when Status.Replica
  275. // is updated and the watch notification from it retriggers
  276. // this function), but in general extra resyncs shouldn't be
  277. // that bad as ReplicaSets that haven't met expectations yet won't
  278. // sync, and all the listing is done using local stores.
  279. if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
  280. klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
  281. }
  282. rsc.enqueueRS(curRS)
  283. }
  284. func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
  285. rs, ok := obj.(*apps.ReplicaSet)
  286. if !ok {
  287. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  288. if !ok {
  289. utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
  290. return
  291. }
  292. rs, ok = tombstone.Obj.(*apps.ReplicaSet)
  293. if !ok {
  294. utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
  295. return
  296. }
  297. }
  298. key, err := controller.KeyFunc(rs)
  299. if err != nil {
  300. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
  301. return
  302. }
  303. klog.V(4).Infof("Deleting %s %q", rsc.Kind, key)
  304. // Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean
  305. rsc.expectations.DeleteExpectations(key)
  306. rsc.queue.Add(key)
  307. }
  308. // When a pod is created, enqueue the replica set that manages it and update its expectations.
  309. func (rsc *ReplicaSetController) addPod(obj interface{}) {
  310. pod := obj.(*v1.Pod)
  311. if pod.DeletionTimestamp != nil {
  312. // on a restart of the controller manager, it's possible a new pod shows up in a state that
  313. // is already pending deletion. Prevent the pod from being a creation observation.
  314. rsc.deletePod(pod)
  315. return
  316. }
  317. // If it has a ControllerRef, that's all that matters.
  318. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
  319. rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
  320. if rs == nil {
  321. return
  322. }
  323. rsKey, err := controller.KeyFunc(rs)
  324. if err != nil {
  325. return
  326. }
  327. klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
  328. rsc.expectations.CreationObserved(rsKey)
  329. rsc.queue.Add(rsKey)
  330. return
  331. }
  332. // Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync
  333. // them to see if anyone wants to adopt it.
  334. // DO NOT observe creation because no controller should be waiting for an
  335. // orphan.
  336. rss := rsc.getPodReplicaSets(pod)
  337. if len(rss) == 0 {
  338. return
  339. }
  340. klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
  341. for _, rs := range rss {
  342. rsc.enqueueRS(rs)
  343. }
  344. }
  345. // When a pod is updated, figure out what replica set/s manage it and wake them
  346. // up. If the labels of the pod have changed we need to awaken both the old
  347. // and new replica set. old and cur must be *v1.Pod types.
  348. func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
  349. curPod := cur.(*v1.Pod)
  350. oldPod := old.(*v1.Pod)
  351. if curPod.ResourceVersion == oldPod.ResourceVersion {
  352. // Periodic resync will send update events for all known pods.
  353. // Two different versions of the same pod will always have different RVs.
  354. return
  355. }
  356. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  357. if curPod.DeletionTimestamp != nil {
  358. // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
  359. // and after such time has passed, the kubelet actually deletes it from the store. We receive an update
  360. // for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait
  361. // until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
  362. // an rs never initiates a phase change, and so is never asleep waiting for the same.
  363. rsc.deletePod(curPod)
  364. if labelChanged {
  365. // we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
  366. rsc.deletePod(oldPod)
  367. }
  368. return
  369. }
  370. curControllerRef := metav1.GetControllerOf(curPod)
  371. oldControllerRef := metav1.GetControllerOf(oldPod)
  372. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  373. if controllerRefChanged && oldControllerRef != nil {
  374. // The ControllerRef was changed. Sync the old controller, if any.
  375. if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
  376. rsc.enqueueRS(rs)
  377. }
  378. }
  379. // If it has a ControllerRef, that's all that matters.
  380. if curControllerRef != nil {
  381. rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
  382. if rs == nil {
  383. return
  384. }
  385. klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
  386. rsc.enqueueRS(rs)
  387. // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
  388. // the Pod status which in turn will trigger a requeue of the owning replica set thus
  389. // having its status updated with the newly available replica. For now, we can fake the
  390. // update by resyncing the controller MinReadySeconds after the it is requeued because
  391. // a Pod transitioned to Ready.
  392. // Note that this still suffers from #29229, we are just moving the problem one level
  393. // "closer" to kubelet (from the deployment to the replica set controller).
  394. if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
  395. klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
  396. // Add a second to avoid milliseconds skew in AddAfter.
  397. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
  398. rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
  399. }
  400. return
  401. }
  402. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  403. // to see if anyone wants to adopt it now.
  404. if labelChanged || controllerRefChanged {
  405. rss := rsc.getPodReplicaSets(curPod)
  406. if len(rss) == 0 {
  407. return
  408. }
  409. klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
  410. for _, rs := range rss {
  411. rsc.enqueueRS(rs)
  412. }
  413. }
  414. }
  415. // When a pod is deleted, enqueue the replica set that manages the pod and update its expectations.
  416. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
  417. func (rsc *ReplicaSetController) deletePod(obj interface{}) {
  418. pod, ok := obj.(*v1.Pod)
  419. // When a delete is dropped, the relist will notice a pod in the store not
  420. // in the list, leading to the insertion of a tombstone object which contains
  421. // the deleted key/value. Note that this value might be stale. If the pod
  422. // changed labels the new ReplicaSet will not be woken up till the periodic resync.
  423. if !ok {
  424. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  425. if !ok {
  426. utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
  427. return
  428. }
  429. pod, ok = tombstone.Obj.(*v1.Pod)
  430. if !ok {
  431. utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
  432. return
  433. }
  434. }
  435. controllerRef := metav1.GetControllerOf(pod)
  436. if controllerRef == nil {
  437. // No controller should care about orphans being deleted.
  438. return
  439. }
  440. rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
  441. if rs == nil {
  442. return
  443. }
  444. rsKey, err := controller.KeyFunc(rs)
  445. if err != nil {
  446. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
  447. return
  448. }
  449. klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
  450. rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
  451. rsc.queue.Add(rsKey)
  452. }
  453. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  454. // It enforces that the syncHandler is never invoked concurrently with the same key.
  455. func (rsc *ReplicaSetController) worker() {
  456. for rsc.processNextWorkItem() {
  457. }
  458. }
  459. func (rsc *ReplicaSetController) processNextWorkItem() bool {
  460. key, quit := rsc.queue.Get()
  461. if quit {
  462. return false
  463. }
  464. defer rsc.queue.Done(key)
  465. err := rsc.syncHandler(key.(string))
  466. if err == nil {
  467. rsc.queue.Forget(key)
  468. return true
  469. }
  470. utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
  471. rsc.queue.AddRateLimited(key)
  472. return true
  473. }
  474. // manageReplicas checks and updates replicas for the given ReplicaSet.
  475. // Does NOT modify <filteredPods>.
  476. // It will requeue the replica set in case of an error while creating/deleting pods.
  477. func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
  478. diff := len(filteredPods) - int(*(rs.Spec.Replicas))
  479. rsKey, err := controller.KeyFunc(rs)
  480. if err != nil {
  481. utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
  482. return nil
  483. }
  484. if diff < 0 {
  485. diff *= -1
  486. if diff > rsc.burstReplicas {
  487. diff = rsc.burstReplicas
  488. }
  489. // TODO: Track UIDs of creates just like deletes. The problem currently
  490. // is we'd need to wait on the result of a create to record the pod's
  491. // UID, which would require locking *across* the create, which will turn
  492. // into a performance bottleneck. We should generate a UID for the pod
  493. // beforehand and store it via ExpectCreations.
  494. rsc.expectations.ExpectCreations(rsKey, diff)
  495. klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
  496. // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
  497. // and double with each successful iteration in a kind of "slow start".
  498. // This handles attempts to start large numbers of pods that would
  499. // likely all fail with the same error. For example a project with a
  500. // low quota that attempts to create a large number of pods will be
  501. // prevented from spamming the API service with the pod create requests
  502. // after one of its pods fails. Conveniently, this also prevents the
  503. // event spam that those failures would generate.
  504. successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
  505. err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
  506. if err != nil {
  507. if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
  508. // if the namespace is being terminated, we don't have to do
  509. // anything because any creation will fail
  510. return nil
  511. }
  512. }
  513. return err
  514. })
  515. // Any skipped pods that we never attempted to start shouldn't be expected.
  516. // The skipped pods will be retried later. The next controller resync will
  517. // retry the slow start process.
  518. if skippedPods := diff - successfulCreations; skippedPods > 0 {
  519. klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
  520. for i := 0; i < skippedPods; i++ {
  521. // Decrement the expected number of creates because the informer won't observe this pod
  522. rsc.expectations.CreationObserved(rsKey)
  523. }
  524. }
  525. return err
  526. } else if diff > 0 {
  527. if diff > rsc.burstReplicas {
  528. diff = rsc.burstReplicas
  529. }
  530. klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
  531. relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
  532. utilruntime.HandleError(err)
  533. // Choose which Pods to delete, preferring those in earlier phases of startup.
  534. podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
  535. // Snapshot the UIDs (ns/name) of the pods we're expecting to see
  536. // deleted, so we know to record their expectations exactly once either
  537. // when we see it as an update of the deletion timestamp, or as a delete.
  538. // Note that if the labels on a pod/rs change in a way that the pod gets
  539. // orphaned, the rs will only wake up after the expectations have
  540. // expired even if other pods are deleted.
  541. rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
  542. errCh := make(chan error, diff)
  543. var wg sync.WaitGroup
  544. wg.Add(diff)
  545. for _, pod := range podsToDelete {
  546. go func(targetPod *v1.Pod) {
  547. defer wg.Done()
  548. if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
  549. // Decrement the expected number of deletes because the informer won't observe this deletion
  550. podKey := controller.PodKey(targetPod)
  551. klog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
  552. rsc.expectations.DeletionObserved(rsKey, podKey)
  553. errCh <- err
  554. }
  555. }(pod)
  556. }
  557. wg.Wait()
  558. select {
  559. case err := <-errCh:
  560. // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
  561. if err != nil {
  562. return err
  563. }
  564. default:
  565. }
  566. }
  567. return nil
  568. }
  569. // syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
  570. // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
  571. // invoked concurrently with the same key.
  572. func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
  573. startTime := time.Now()
  574. defer func() {
  575. klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
  576. }()
  577. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  578. if err != nil {
  579. return err
  580. }
  581. rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
  582. if errors.IsNotFound(err) {
  583. klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
  584. rsc.expectations.DeleteExpectations(key)
  585. return nil
  586. }
  587. if err != nil {
  588. return err
  589. }
  590. rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
  591. selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
  592. if err != nil {
  593. utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
  594. return nil
  595. }
  596. // list all pods to include the pods that don't match the rs`s selector
  597. // anymore but has the stale controller ref.
  598. // TODO: Do the List and Filter in a single pass, or use an index.
  599. allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
  600. if err != nil {
  601. return err
  602. }
  603. // Ignore inactive pods.
  604. filteredPods := controller.FilterActivePods(allPods)
  605. // NOTE: filteredPods are pointing to objects from cache - if you need to
  606. // modify them, you need to copy it first.
  607. filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
  608. if err != nil {
  609. return err
  610. }
  611. var manageReplicasErr error
  612. if rsNeedsSync && rs.DeletionTimestamp == nil {
  613. manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
  614. }
  615. rs = rs.DeepCopy()
  616. newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
  617. // Always updates status as pods come up or die.
  618. updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
  619. if err != nil {
  620. // Multiple things could lead to this update failing. Requeuing the replica set ensures
  621. // Returning an error causes a requeue without forcing a hotloop
  622. return err
  623. }
  624. // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
  625. if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
  626. updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
  627. updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
  628. rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
  629. }
  630. return manageReplicasErr
  631. }
  632. func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
  633. // If any adoptions are attempted, we should first recheck for deletion with
  634. // an uncached quorum read sometime after listing Pods (see #42639).
  635. canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  636. fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(context.TODO(), rs.Name, metav1.GetOptions{})
  637. if err != nil {
  638. return nil, err
  639. }
  640. if fresh.UID != rs.UID {
  641. return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
  642. }
  643. return fresh, nil
  644. })
  645. cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
  646. return cm.ClaimPods(filteredPods)
  647. }
  648. // slowStartBatch tries to call the provided function a total of 'count' times,
  649. // starting slow to check for errors, then speeding up if calls succeed.
  650. //
  651. // It groups the calls into batches, starting with a group of initialBatchSize.
  652. // Within each batch, it may call the function multiple times concurrently.
  653. //
  654. // If a whole batch succeeds, the next batch may get exponentially larger.
  655. // If there are any failures in a batch, all remaining batches are skipped
  656. // after waiting for the current batch to complete.
  657. //
  658. // It returns the number of successful calls to the function.
  659. func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
  660. remaining := count
  661. successes := 0
  662. for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
  663. errCh := make(chan error, batchSize)
  664. var wg sync.WaitGroup
  665. wg.Add(batchSize)
  666. for i := 0; i < batchSize; i++ {
  667. go func() {
  668. defer wg.Done()
  669. if err := fn(); err != nil {
  670. errCh <- err
  671. }
  672. }()
  673. }
  674. wg.Wait()
  675. curSuccesses := batchSize - len(errCh)
  676. successes += curSuccesses
  677. if len(errCh) > 0 {
  678. return successes, <-errCh
  679. }
  680. remaining -= batchSize
  681. }
  682. return successes, nil
  683. }
  684. // getIndirectlyRelatedPods returns all pods that are owned by any ReplicaSet
  685. // that is owned by the given ReplicaSet's owner.
  686. func (rsc *ReplicaSetController) getIndirectlyRelatedPods(rs *apps.ReplicaSet) ([]*v1.Pod, error) {
  687. var relatedPods []*v1.Pod
  688. seen := make(map[types.UID]*apps.ReplicaSet)
  689. for _, relatedRS := range rsc.getReplicaSetsWithSameController(rs) {
  690. selector, err := metav1.LabelSelectorAsSelector(relatedRS.Spec.Selector)
  691. if err != nil {
  692. return nil, err
  693. }
  694. pods, err := rsc.podLister.Pods(relatedRS.Namespace).List(selector)
  695. if err != nil {
  696. return nil, err
  697. }
  698. for _, pod := range pods {
  699. if otherRS, found := seen[pod.UID]; found {
  700. klog.V(5).Infof("Pod %s/%s is owned by both %v %s/%s and %v %s/%s", pod.Namespace, pod.Name, rsc.Kind, otherRS.Namespace, otherRS.Name, rsc.Kind, relatedRS.Namespace, relatedRS.Name)
  701. continue
  702. }
  703. seen[pod.UID] = relatedRS
  704. relatedPods = append(relatedPods, pod)
  705. }
  706. }
  707. if klog.V(4) {
  708. var relatedNames []string
  709. for _, related := range relatedPods {
  710. relatedNames = append(relatedNames, related.Name)
  711. }
  712. klog.Infof("Found %d related pods for %v %s/%s: %v", len(relatedPods), rsc.Kind, rs.Namespace, rs.Name, strings.Join(relatedNames, ", "))
  713. }
  714. return relatedPods, nil
  715. }
  716. func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
  717. // No need to sort pods if we are about to delete all of them.
  718. // diff will always be <= len(filteredPods), so not need to handle > case.
  719. if diff < len(filteredPods) {
  720. podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
  721. sort.Sort(podsWithRanks)
  722. }
  723. return filteredPods[:diff]
  724. }
  725. // getPodsRankedByRelatedPodsOnSameNode returns an ActivePodsWithRanks value
  726. // that wraps podsToRank and assigns each pod a rank equal to the number of
  727. // active pods in relatedPods that are colocated on the same node with the pod.
  728. // relatedPods generally should be a superset of podsToRank.
  729. func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
  730. podsOnNode := make(map[string]int)
  731. for _, pod := range relatedPods {
  732. if controller.IsPodActive(pod) {
  733. podsOnNode[pod.Spec.NodeName]++
  734. }
  735. }
  736. ranks := make([]int, len(podsToRank))
  737. for i, pod := range podsToRank {
  738. ranks[i] = podsOnNode[pod.Spec.NodeName]
  739. }
  740. return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks}
  741. }
  742. func getPodKeys(pods []*v1.Pod) []string {
  743. podKeys := make([]string, 0, len(pods))
  744. for _, pod := range pods {
  745. podKeys = append(podKeys, controller.PodKey(pod))
  746. }
  747. return podKeys
  748. }