replica_set.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // ### ATTENTION ###
  14. //
  15. // This code implements both ReplicaSet and ReplicationController.
  16. //
  17. // For RC, the objects are converted on the way in and out (see ../replication/),
  18. // as if ReplicationController were just an older API version of ReplicaSet.
  19. // However, RC and RS still have separate storage and separate instantiations
  20. // of the ReplicaSetController object.
  21. //
  22. // Use rsc.Kind in log messages rather than hard-coding "ReplicaSet".
  23. package replicaset
  24. import (
  25. "fmt"
  26. "reflect"
  27. "sort"
  28. "strings"
  29. "sync"
  30. "time"
  31. apps "k8s.io/api/apps/v1"
  32. "k8s.io/api/core/v1"
  33. "k8s.io/apimachinery/pkg/api/errors"
  34. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  35. "k8s.io/apimachinery/pkg/labels"
  36. "k8s.io/apimachinery/pkg/runtime/schema"
  37. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  38. "k8s.io/apimachinery/pkg/util/wait"
  39. appsinformers "k8s.io/client-go/informers/apps/v1"
  40. coreinformers "k8s.io/client-go/informers/core/v1"
  41. clientset "k8s.io/client-go/kubernetes"
  42. "k8s.io/client-go/kubernetes/scheme"
  43. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  44. appslisters "k8s.io/client-go/listers/apps/v1"
  45. corelisters "k8s.io/client-go/listers/core/v1"
  46. "k8s.io/client-go/tools/cache"
  47. "k8s.io/client-go/tools/record"
  48. "k8s.io/client-go/util/workqueue"
  49. "k8s.io/klog"
  50. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  51. "k8s.io/kubernetes/pkg/controller"
  52. "k8s.io/kubernetes/pkg/util/metrics"
  53. "k8s.io/utils/integer"
  54. )
  55. const (
  56. // Realistic value of the burstReplica field for the replica set manager based off
  57. // performance requirements for kubernetes 1.0.
  58. BurstReplicas = 500
  59. // The number of times we retry updating a ReplicaSet's status.
  60. statusUpdateRetries = 1
  61. )
  62. // ReplicaSetController is responsible for synchronizing ReplicaSet objects stored
  63. // in the system with actual running pods.
  64. type ReplicaSetController struct {
  65. // GroupVersionKind indicates the controller type.
  66. // Different instances of this struct may handle different GVKs.
  67. // For example, this struct can be used (with adapters) to handle ReplicationController.
  68. schema.GroupVersionKind
  69. kubeClient clientset.Interface
  70. podControl controller.PodControlInterface
  71. // A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
  72. // It resumes normal action after observing the watch events for them.
  73. burstReplicas int
  74. // To allow injection of syncReplicaSet for testing.
  75. syncHandler func(rsKey string) error
  76. // A TTLCache of pod creates/deletes each rc expects to see.
  77. expectations *controller.UIDTrackingControllerExpectations
  78. // A store of ReplicaSets, populated by the shared informer passed to NewReplicaSetController
  79. rsLister appslisters.ReplicaSetLister
  80. // rsListerSynced returns true if the pod store has been synced at least once.
  81. // Added as a member to the struct to allow injection for testing.
  82. rsListerSynced cache.InformerSynced
  83. // A store of pods, populated by the shared informer passed to NewReplicaSetController
  84. podLister corelisters.PodLister
  85. // podListerSynced returns true if the pod store has been synced at least once.
  86. // Added as a member to the struct to allow injection for testing.
  87. podListerSynced cache.InformerSynced
  88. // Controllers that need to be synced
  89. queue workqueue.RateLimitingInterface
  90. }
  91. // NewReplicaSetController configures a replica set controller with the specified event recorder
  92. func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
  93. eventBroadcaster := record.NewBroadcaster()
  94. eventBroadcaster.StartLogging(klog.Infof)
  95. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  96. return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
  97. apps.SchemeGroupVersion.WithKind("ReplicaSet"),
  98. "replicaset_controller",
  99. "replicaset",
  100. controller.RealPodControl{
  101. KubeClient: kubeClient,
  102. Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
  103. },
  104. )
  105. }
  106. // NewBaseController is the implementation of NewReplicaSetController with additional injected
  107. // parameters so that it can also serve as the implementation of NewReplicationController.
  108. func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
  109. gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
  110. if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
  111. metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
  112. }
  113. rsc := &ReplicaSetController{
  114. GroupVersionKind: gvk,
  115. kubeClient: kubeClient,
  116. podControl: podControl,
  117. burstReplicas: burstReplicas,
  118. expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
  119. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
  120. }
  121. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  122. AddFunc: rsc.enqueueReplicaSet,
  123. UpdateFunc: rsc.updateRS,
  124. // This will enter the sync loop and no-op, because the replica set has been deleted from the store.
  125. // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
  126. // way of achieving this is by performing a `stop` operation on the replica set.
  127. DeleteFunc: rsc.enqueueReplicaSet,
  128. })
  129. rsc.rsLister = rsInformer.Lister()
  130. rsc.rsListerSynced = rsInformer.Informer().HasSynced
  131. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  132. AddFunc: rsc.addPod,
  133. // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
  134. // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
  135. // local storage, so it should be ok.
  136. UpdateFunc: rsc.updatePod,
  137. DeleteFunc: rsc.deletePod,
  138. })
  139. rsc.podLister = podInformer.Lister()
  140. rsc.podListerSynced = podInformer.Informer().HasSynced
  141. rsc.syncHandler = rsc.syncReplicaSet
  142. return rsc
  143. }
  144. // SetEventRecorder replaces the event recorder used by the ReplicaSetController
  145. // with the given recorder. Only used for testing.
  146. func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) {
  147. // TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
  148. // need to pass in a fake.
  149. rsc.podControl = controller.RealPodControl{KubeClient: rsc.kubeClient, Recorder: recorder}
  150. }
  151. // Run begins watching and syncing.
  152. func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
  153. defer utilruntime.HandleCrash()
  154. defer rsc.queue.ShutDown()
  155. controllerName := strings.ToLower(rsc.Kind)
  156. klog.Infof("Starting %v controller", controllerName)
  157. defer klog.Infof("Shutting down %v controller", controllerName)
  158. if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
  159. return
  160. }
  161. for i := 0; i < workers; i++ {
  162. go wait.Until(rsc.worker, time.Second, stopCh)
  163. }
  164. <-stopCh
  165. }
  166. // getPodReplicaSets returns a list of ReplicaSets matching the given pod.
  167. func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*apps.ReplicaSet {
  168. rss, err := rsc.rsLister.GetPodReplicaSets(pod)
  169. if err != nil {
  170. return nil
  171. }
  172. if len(rss) > 1 {
  173. // ControllerRef will ensure we don't do anything crazy, but more than one
  174. // item in this list nevertheless constitutes user error.
  175. utilruntime.HandleError(fmt.Errorf("user error! more than one %v is selecting pods with labels: %+v", rsc.Kind, pod.Labels))
  176. }
  177. return rss
  178. }
  179. // resolveControllerRef returns the controller referenced by a ControllerRef,
  180. // or nil if the ControllerRef could not be resolved to a matching controller
  181. // of the correct Kind.
  182. func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.ReplicaSet {
  183. // We can't look up by UID, so look up by Name and then verify UID.
  184. // Don't even try to look up by Name if it's the wrong Kind.
  185. if controllerRef.Kind != rsc.Kind {
  186. return nil
  187. }
  188. rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
  189. if err != nil {
  190. return nil
  191. }
  192. if rs.UID != controllerRef.UID {
  193. // The controller we found with this Name is not the same one that the
  194. // ControllerRef points to.
  195. return nil
  196. }
  197. return rs
  198. }
  199. // callback when RS is updated
  200. func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
  201. oldRS := old.(*apps.ReplicaSet)
  202. curRS := cur.(*apps.ReplicaSet)
  203. // You might imagine that we only really need to enqueue the
  204. // replica set when Spec changes, but it is safer to sync any
  205. // time this function is triggered. That way a full informer
  206. // resync can requeue any replica set that don't yet have pods
  207. // but whose last attempts at creating a pod have failed (since
  208. // we don't block on creation of pods) instead of those
  209. // replica sets stalling indefinitely. Enqueueing every time
  210. // does result in some spurious syncs (like when Status.Replica
  211. // is updated and the watch notification from it retriggers
  212. // this function), but in general extra resyncs shouldn't be
  213. // that bad as ReplicaSets that haven't met expectations yet won't
  214. // sync, and all the listing is done using local stores.
  215. if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
  216. klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
  217. }
  218. rsc.enqueueReplicaSet(cur)
  219. }
  220. // When a pod is created, enqueue the replica set that manages it and update its expectations.
  221. func (rsc *ReplicaSetController) addPod(obj interface{}) {
  222. pod := obj.(*v1.Pod)
  223. if pod.DeletionTimestamp != nil {
  224. // on a restart of the controller manager, it's possible a new pod shows up in a state that
  225. // is already pending deletion. Prevent the pod from being a creation observation.
  226. rsc.deletePod(pod)
  227. return
  228. }
  229. // If it has a ControllerRef, that's all that matters.
  230. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
  231. rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
  232. if rs == nil {
  233. return
  234. }
  235. rsKey, err := controller.KeyFunc(rs)
  236. if err != nil {
  237. return
  238. }
  239. klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
  240. rsc.expectations.CreationObserved(rsKey)
  241. rsc.enqueueReplicaSet(rs)
  242. return
  243. }
  244. // Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync
  245. // them to see if anyone wants to adopt it.
  246. // DO NOT observe creation because no controller should be waiting for an
  247. // orphan.
  248. rss := rsc.getPodReplicaSets(pod)
  249. if len(rss) == 0 {
  250. return
  251. }
  252. klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
  253. for _, rs := range rss {
  254. rsc.enqueueReplicaSet(rs)
  255. }
  256. }
  257. // When a pod is updated, figure out what replica set/s manage it and wake them
  258. // up. If the labels of the pod have changed we need to awaken both the old
  259. // and new replica set. old and cur must be *v1.Pod types.
  260. func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
  261. curPod := cur.(*v1.Pod)
  262. oldPod := old.(*v1.Pod)
  263. if curPod.ResourceVersion == oldPod.ResourceVersion {
  264. // Periodic resync will send update events for all known pods.
  265. // Two different versions of the same pod will always have different RVs.
  266. return
  267. }
  268. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  269. if curPod.DeletionTimestamp != nil {
  270. // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
  271. // and after such time has passed, the kubelet actually deletes it from the store. We receive an update
  272. // for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait
  273. // until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
  274. // an rs never initiates a phase change, and so is never asleep waiting for the same.
  275. rsc.deletePod(curPod)
  276. if labelChanged {
  277. // we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
  278. rsc.deletePod(oldPod)
  279. }
  280. return
  281. }
  282. curControllerRef := metav1.GetControllerOf(curPod)
  283. oldControllerRef := metav1.GetControllerOf(oldPod)
  284. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  285. if controllerRefChanged && oldControllerRef != nil {
  286. // The ControllerRef was changed. Sync the old controller, if any.
  287. if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
  288. rsc.enqueueReplicaSet(rs)
  289. }
  290. }
  291. // If it has a ControllerRef, that's all that matters.
  292. if curControllerRef != nil {
  293. rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
  294. if rs == nil {
  295. return
  296. }
  297. klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
  298. rsc.enqueueReplicaSet(rs)
  299. // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
  300. // the Pod status which in turn will trigger a requeue of the owning replica set thus
  301. // having its status updated with the newly available replica. For now, we can fake the
  302. // update by resyncing the controller MinReadySeconds after the it is requeued because
  303. // a Pod transitioned to Ready.
  304. // Note that this still suffers from #29229, we are just moving the problem one level
  305. // "closer" to kubelet (from the deployment to the replica set controller).
  306. if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
  307. klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
  308. // Add a second to avoid milliseconds skew in AddAfter.
  309. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
  310. rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
  311. }
  312. return
  313. }
  314. // Otherwise, it's an orphan. If anything changed, sync matching controllers
  315. // to see if anyone wants to adopt it now.
  316. if labelChanged || controllerRefChanged {
  317. rss := rsc.getPodReplicaSets(curPod)
  318. if len(rss) == 0 {
  319. return
  320. }
  321. klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
  322. for _, rs := range rss {
  323. rsc.enqueueReplicaSet(rs)
  324. }
  325. }
  326. }
  327. // When a pod is deleted, enqueue the replica set that manages the pod and update its expectations.
  328. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
  329. func (rsc *ReplicaSetController) deletePod(obj interface{}) {
  330. pod, ok := obj.(*v1.Pod)
  331. // When a delete is dropped, the relist will notice a pod in the store not
  332. // in the list, leading to the insertion of a tombstone object which contains
  333. // the deleted key/value. Note that this value might be stale. If the pod
  334. // changed labels the new ReplicaSet will not be woken up till the periodic resync.
  335. if !ok {
  336. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  337. if !ok {
  338. utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
  339. return
  340. }
  341. pod, ok = tombstone.Obj.(*v1.Pod)
  342. if !ok {
  343. utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
  344. return
  345. }
  346. }
  347. controllerRef := metav1.GetControllerOf(pod)
  348. if controllerRef == nil {
  349. // No controller should care about orphans being deleted.
  350. return
  351. }
  352. rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
  353. if rs == nil {
  354. return
  355. }
  356. rsKey, err := controller.KeyFunc(rs)
  357. if err != nil {
  358. return
  359. }
  360. klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
  361. rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
  362. rsc.enqueueReplicaSet(rs)
  363. }
  364. // obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
  365. func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
  366. key, err := controller.KeyFunc(obj)
  367. if err != nil {
  368. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
  369. return
  370. }
  371. rsc.queue.Add(key)
  372. }
  373. // obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
  374. func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) {
  375. key, err := controller.KeyFunc(obj)
  376. if err != nil {
  377. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
  378. return
  379. }
  380. rsc.queue.AddAfter(key, after)
  381. }
  382. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  383. // It enforces that the syncHandler is never invoked concurrently with the same key.
  384. func (rsc *ReplicaSetController) worker() {
  385. for rsc.processNextWorkItem() {
  386. }
  387. }
  388. func (rsc *ReplicaSetController) processNextWorkItem() bool {
  389. key, quit := rsc.queue.Get()
  390. if quit {
  391. return false
  392. }
  393. defer rsc.queue.Done(key)
  394. err := rsc.syncHandler(key.(string))
  395. if err == nil {
  396. rsc.queue.Forget(key)
  397. return true
  398. }
  399. utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
  400. rsc.queue.AddRateLimited(key)
  401. return true
  402. }
  403. // manageReplicas checks and updates replicas for the given ReplicaSet.
  404. // Does NOT modify <filteredPods>.
  405. // It will requeue the replica set in case of an error while creating/deleting pods.
  406. func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
  407. diff := len(filteredPods) - int(*(rs.Spec.Replicas))
  408. rsKey, err := controller.KeyFunc(rs)
  409. if err != nil {
  410. utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
  411. return nil
  412. }
  413. if diff < 0 {
  414. diff *= -1
  415. if diff > rsc.burstReplicas {
  416. diff = rsc.burstReplicas
  417. }
  418. // TODO: Track UIDs of creates just like deletes. The problem currently
  419. // is we'd need to wait on the result of a create to record the pod's
  420. // UID, which would require locking *across* the create, which will turn
  421. // into a performance bottleneck. We should generate a UID for the pod
  422. // beforehand and store it via ExpectCreations.
  423. rsc.expectations.ExpectCreations(rsKey, diff)
  424. klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
  425. // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
  426. // and double with each successful iteration in a kind of "slow start".
  427. // This handles attempts to start large numbers of pods that would
  428. // likely all fail with the same error. For example a project with a
  429. // low quota that attempts to create a large number of pods will be
  430. // prevented from spamming the API service with the pod create requests
  431. // after one of its pods fails. Conveniently, this also prevents the
  432. // event spam that those failures would generate.
  433. successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
  434. err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
  435. if err != nil && errors.IsTimeout(err) {
  436. // Pod is created but its initialization has timed out.
  437. // If the initialization is successful eventually, the
  438. // controller will observe the creation via the informer.
  439. // If the initialization fails, or if the pod keeps
  440. // uninitialized for a long time, the informer will not
  441. // receive any update, and the controller will create a new
  442. // pod when the expectation expires.
  443. return nil
  444. }
  445. return err
  446. })
  447. // Any skipped pods that we never attempted to start shouldn't be expected.
  448. // The skipped pods will be retried later. The next controller resync will
  449. // retry the slow start process.
  450. if skippedPods := diff - successfulCreations; skippedPods > 0 {
  451. klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
  452. for i := 0; i < skippedPods; i++ {
  453. // Decrement the expected number of creates because the informer won't observe this pod
  454. rsc.expectations.CreationObserved(rsKey)
  455. }
  456. }
  457. return err
  458. } else if diff > 0 {
  459. if diff > rsc.burstReplicas {
  460. diff = rsc.burstReplicas
  461. }
  462. klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
  463. // Choose which Pods to delete, preferring those in earlier phases of startup.
  464. podsToDelete := getPodsToDelete(filteredPods, diff)
  465. // Snapshot the UIDs (ns/name) of the pods we're expecting to see
  466. // deleted, so we know to record their expectations exactly once either
  467. // when we see it as an update of the deletion timestamp, or as a delete.
  468. // Note that if the labels on a pod/rs change in a way that the pod gets
  469. // orphaned, the rs will only wake up after the expectations have
  470. // expired even if other pods are deleted.
  471. rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
  472. errCh := make(chan error, diff)
  473. var wg sync.WaitGroup
  474. wg.Add(diff)
  475. for _, pod := range podsToDelete {
  476. go func(targetPod *v1.Pod) {
  477. defer wg.Done()
  478. if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
  479. // Decrement the expected number of deletes because the informer won't observe this deletion
  480. podKey := controller.PodKey(targetPod)
  481. klog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
  482. rsc.expectations.DeletionObserved(rsKey, podKey)
  483. errCh <- err
  484. }
  485. }(pod)
  486. }
  487. wg.Wait()
  488. select {
  489. case err := <-errCh:
  490. // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
  491. if err != nil {
  492. return err
  493. }
  494. default:
  495. }
  496. }
  497. return nil
  498. }
  499. // syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
  500. // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
  501. // invoked concurrently with the same key.
  502. func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
  503. startTime := time.Now()
  504. defer func() {
  505. klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
  506. }()
  507. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  508. if err != nil {
  509. return err
  510. }
  511. rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
  512. if errors.IsNotFound(err) {
  513. klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
  514. rsc.expectations.DeleteExpectations(key)
  515. return nil
  516. }
  517. if err != nil {
  518. return err
  519. }
  520. rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
  521. selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
  522. if err != nil {
  523. utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
  524. return nil
  525. }
  526. // list all pods to include the pods that don't match the rs`s selector
  527. // anymore but has the stale controller ref.
  528. // TODO: Do the List and Filter in a single pass, or use an index.
  529. allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
  530. if err != nil {
  531. return err
  532. }
  533. // Ignore inactive pods.
  534. filteredPods := controller.FilterActivePods(allPods)
  535. // NOTE: filteredPods are pointing to objects from cache - if you need to
  536. // modify them, you need to copy it first.
  537. filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
  538. if err != nil {
  539. return err
  540. }
  541. var manageReplicasErr error
  542. if rsNeedsSync && rs.DeletionTimestamp == nil {
  543. manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
  544. }
  545. rs = rs.DeepCopy()
  546. newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
  547. // Always updates status as pods come up or die.
  548. updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
  549. if err != nil {
  550. // Multiple things could lead to this update failing. Requeuing the replica set ensures
  551. // Returning an error causes a requeue without forcing a hotloop
  552. return err
  553. }
  554. // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
  555. if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
  556. updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
  557. updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
  558. rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
  559. }
  560. return manageReplicasErr
  561. }
  562. func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
  563. // If any adoptions are attempted, we should first recheck for deletion with
  564. // an uncached quorum read sometime after listing Pods (see #42639).
  565. canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  566. fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(rs.Name, metav1.GetOptions{})
  567. if err != nil {
  568. return nil, err
  569. }
  570. if fresh.UID != rs.UID {
  571. return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
  572. }
  573. return fresh, nil
  574. })
  575. cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
  576. return cm.ClaimPods(filteredPods)
  577. }
  578. // slowStartBatch tries to call the provided function a total of 'count' times,
  579. // starting slow to check for errors, then speeding up if calls succeed.
  580. //
  581. // It groups the calls into batches, starting with a group of initialBatchSize.
  582. // Within each batch, it may call the function multiple times concurrently.
  583. //
  584. // If a whole batch succeeds, the next batch may get exponentially larger.
  585. // If there are any failures in a batch, all remaining batches are skipped
  586. // after waiting for the current batch to complete.
  587. //
  588. // It returns the number of successful calls to the function.
  589. func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
  590. remaining := count
  591. successes := 0
  592. for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
  593. errCh := make(chan error, batchSize)
  594. var wg sync.WaitGroup
  595. wg.Add(batchSize)
  596. for i := 0; i < batchSize; i++ {
  597. go func() {
  598. defer wg.Done()
  599. if err := fn(); err != nil {
  600. errCh <- err
  601. }
  602. }()
  603. }
  604. wg.Wait()
  605. curSuccesses := batchSize - len(errCh)
  606. successes += curSuccesses
  607. if len(errCh) > 0 {
  608. return successes, <-errCh
  609. }
  610. remaining -= batchSize
  611. }
  612. return successes, nil
  613. }
  614. func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod {
  615. // No need to sort pods if we are about to delete all of them.
  616. // diff will always be <= len(filteredPods), so not need to handle > case.
  617. if diff < len(filteredPods) {
  618. // Sort the pods in the order such that not-ready < ready, unscheduled
  619. // < scheduled, and pending < running. This ensures that we delete pods
  620. // in the earlier stages whenever possible.
  621. sort.Sort(controller.ActivePods(filteredPods))
  622. }
  623. return filteredPods[:diff]
  624. }
  625. func getPodKeys(pods []*v1.Pod) []string {
  626. podKeys := make([]string, 0, len(pods))
  627. for _, pod := range pods {
  628. podKeys = append(podKeys, controller.PodKey(pod))
  629. }
  630. return podKeys
  631. }