disruption.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package disruption
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. apps "k8s.io/api/apps/v1beta1"
  19. v1 "k8s.io/api/core/v1"
  20. "k8s.io/api/extensions/v1beta1"
  21. policy "k8s.io/api/policy/v1beta1"
  22. apiequality "k8s.io/apimachinery/pkg/api/equality"
  23. "k8s.io/apimachinery/pkg/api/errors"
  24. apimeta "k8s.io/apimachinery/pkg/api/meta"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/runtime/schema"
  27. "k8s.io/apimachinery/pkg/types"
  28. "k8s.io/apimachinery/pkg/util/intstr"
  29. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. appsv1informers "k8s.io/client-go/informers/apps/v1"
  32. coreinformers "k8s.io/client-go/informers/core/v1"
  33. policyinformers "k8s.io/client-go/informers/policy/v1beta1"
  34. clientset "k8s.io/client-go/kubernetes"
  35. "k8s.io/client-go/kubernetes/scheme"
  36. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  37. appsv1listers "k8s.io/client-go/listers/apps/v1"
  38. corelisters "k8s.io/client-go/listers/core/v1"
  39. policylisters "k8s.io/client-go/listers/policy/v1beta1"
  40. scaleclient "k8s.io/client-go/scale"
  41. "k8s.io/client-go/tools/cache"
  42. "k8s.io/client-go/tools/record"
  43. "k8s.io/client-go/util/workqueue"
  44. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  45. "k8s.io/kubernetes/pkg/controller"
  46. "k8s.io/klog"
  47. )
  48. // DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
  49. // to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
  50. // If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
  51. // all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
  52. // pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
  53. // be more than enough.
  54. // If the controller is running on a different node it is important that the two nodes have synced
  55. // clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
  56. // protection against unwanted pod disruptions.
  57. const DeletionTimeout = 2 * 60 * time.Second
  58. type updater func(*policy.PodDisruptionBudget) error
  59. type DisruptionController struct {
  60. kubeClient clientset.Interface
  61. mapper apimeta.RESTMapper
  62. scaleNamespacer scaleclient.ScalesGetter
  63. pdbLister policylisters.PodDisruptionBudgetLister
  64. pdbListerSynced cache.InformerSynced
  65. podLister corelisters.PodLister
  66. podListerSynced cache.InformerSynced
  67. rcLister corelisters.ReplicationControllerLister
  68. rcListerSynced cache.InformerSynced
  69. rsLister appsv1listers.ReplicaSetLister
  70. rsListerSynced cache.InformerSynced
  71. dLister appsv1listers.DeploymentLister
  72. dListerSynced cache.InformerSynced
  73. ssLister appsv1listers.StatefulSetLister
  74. ssListerSynced cache.InformerSynced
  75. // PodDisruptionBudget keys that need to be synced.
  76. queue workqueue.RateLimitingInterface
  77. recheckQueue workqueue.DelayingInterface
  78. broadcaster record.EventBroadcaster
  79. recorder record.EventRecorder
  80. getUpdater func() updater
  81. }
  82. // controllerAndScale is used to return (controller, scale) pairs from the
  83. // controller finder functions.
  84. type controllerAndScale struct {
  85. types.UID
  86. scale int32
  87. }
  88. // podControllerFinder is a function type that maps a pod to a list of
  89. // controllers and their scale.
  90. type podControllerFinder func(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error)
  91. func NewDisruptionController(
  92. podInformer coreinformers.PodInformer,
  93. pdbInformer policyinformers.PodDisruptionBudgetInformer,
  94. rcInformer coreinformers.ReplicationControllerInformer,
  95. rsInformer appsv1informers.ReplicaSetInformer,
  96. dInformer appsv1informers.DeploymentInformer,
  97. ssInformer appsv1informers.StatefulSetInformer,
  98. kubeClient clientset.Interface,
  99. restMapper apimeta.RESTMapper,
  100. scaleNamespacer scaleclient.ScalesGetter,
  101. ) *DisruptionController {
  102. dc := &DisruptionController{
  103. kubeClient: kubeClient,
  104. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
  105. recheckQueue: workqueue.NewNamedDelayingQueue("disruption_recheck"),
  106. broadcaster: record.NewBroadcaster(),
  107. }
  108. dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
  109. dc.getUpdater = func() updater { return dc.writePdbStatus }
  110. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  111. AddFunc: dc.addPod,
  112. UpdateFunc: dc.updatePod,
  113. DeleteFunc: dc.deletePod,
  114. })
  115. dc.podLister = podInformer.Lister()
  116. dc.podListerSynced = podInformer.Informer().HasSynced
  117. pdbInformer.Informer().AddEventHandlerWithResyncPeriod(
  118. cache.ResourceEventHandlerFuncs{
  119. AddFunc: dc.addDb,
  120. UpdateFunc: dc.updateDb,
  121. DeleteFunc: dc.removeDb,
  122. },
  123. 30*time.Second,
  124. )
  125. dc.pdbLister = pdbInformer.Lister()
  126. dc.pdbListerSynced = pdbInformer.Informer().HasSynced
  127. dc.rcLister = rcInformer.Lister()
  128. dc.rcListerSynced = rcInformer.Informer().HasSynced
  129. dc.rsLister = rsInformer.Lister()
  130. dc.rsListerSynced = rsInformer.Informer().HasSynced
  131. dc.dLister = dInformer.Lister()
  132. dc.dListerSynced = dInformer.Informer().HasSynced
  133. dc.ssLister = ssInformer.Lister()
  134. dc.ssListerSynced = ssInformer.Informer().HasSynced
  135. dc.mapper = restMapper
  136. dc.scaleNamespacer = scaleNamespacer
  137. return dc
  138. }
  139. // The workload resources do implement the scale subresource, so it would
  140. // be possible to only check the scale subresource here. But since there is no
  141. // way to take advantage of listers with scale subresources, we use the workload
  142. // resources directly and only fall back to the scale subresource when needed.
  143. func (dc *DisruptionController) finders() []podControllerFinder {
  144. return []podControllerFinder{dc.getPodReplicationController, dc.getPodDeployment, dc.getPodReplicaSet,
  145. dc.getPodStatefulSet, dc.getScaleController}
  146. }
  147. var (
  148. controllerKindRS = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet")
  149. controllerKindSS = apps.SchemeGroupVersion.WithKind("StatefulSet")
  150. controllerKindRC = v1.SchemeGroupVersion.WithKind("ReplicationController")
  151. controllerKindDep = v1beta1.SchemeGroupVersion.WithKind("Deployment")
  152. )
  153. // getPodReplicaSet finds a replicaset which has no matching deployments.
  154. func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
  155. ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
  156. if !ok || err != nil {
  157. return nil, err
  158. }
  159. rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
  160. if err != nil {
  161. // The only possible error is NotFound, which is ok here.
  162. return nil, nil
  163. }
  164. if rs.UID != controllerRef.UID {
  165. return nil, nil
  166. }
  167. controllerRef = metav1.GetControllerOf(rs)
  168. if controllerRef != nil && controllerRef.Kind == controllerKindDep.Kind {
  169. // Skip RS if it's controlled by a Deployment.
  170. return nil, nil
  171. }
  172. return &controllerAndScale{rs.UID, *(rs.Spec.Replicas)}, nil
  173. }
  174. // getPodStatefulSet returns the statefulset referenced by the provided controllerRef.
  175. func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
  176. ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"})
  177. if !ok || err != nil {
  178. return nil, err
  179. }
  180. ss, err := dc.ssLister.StatefulSets(namespace).Get(controllerRef.Name)
  181. if err != nil {
  182. // The only possible error is NotFound, which is ok here.
  183. return nil, nil
  184. }
  185. if ss.UID != controllerRef.UID {
  186. return nil, nil
  187. }
  188. return &controllerAndScale{ss.UID, *(ss.Spec.Replicas)}, nil
  189. }
  190. // getPodDeployments finds deployments for any replicasets which are being managed by deployments.
  191. func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
  192. ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
  193. if !ok || err != nil {
  194. return nil, err
  195. }
  196. rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
  197. if err != nil {
  198. // The only possible error is NotFound, which is ok here.
  199. return nil, nil
  200. }
  201. if rs.UID != controllerRef.UID {
  202. return nil, nil
  203. }
  204. controllerRef = metav1.GetControllerOf(rs)
  205. if controllerRef == nil {
  206. return nil, nil
  207. }
  208. ok, err = verifyGroupKind(controllerRef, controllerKindDep.Kind, []string{"apps", "extensions"})
  209. if !ok || err != nil {
  210. return nil, err
  211. }
  212. deployment, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
  213. if err != nil {
  214. // The only possible error is NotFound, which is ok here.
  215. return nil, nil
  216. }
  217. if deployment.UID != controllerRef.UID {
  218. return nil, nil
  219. }
  220. return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil
  221. }
  222. func (dc *DisruptionController) getPodReplicationController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
  223. ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""})
  224. if !ok || err != nil {
  225. return nil, err
  226. }
  227. rc, err := dc.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name)
  228. if err != nil {
  229. // The only possible error is NotFound, which is ok here.
  230. return nil, nil
  231. }
  232. if rc.UID != controllerRef.UID {
  233. return nil, nil
  234. }
  235. return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil
  236. }
  237. func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
  238. gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
  239. if err != nil {
  240. return nil, err
  241. }
  242. gk := schema.GroupKind{
  243. Group: gv.Group,
  244. Kind: controllerRef.Kind,
  245. }
  246. mapping, err := dc.mapper.RESTMapping(gk, gv.Version)
  247. if err != nil {
  248. return nil, err
  249. }
  250. gr := mapping.Resource.GroupResource()
  251. scale, err := dc.scaleNamespacer.Scales(namespace).Get(gr, controllerRef.Name)
  252. if err != nil {
  253. if errors.IsNotFound(err) {
  254. return nil, nil
  255. }
  256. return nil, err
  257. }
  258. if scale.UID != controllerRef.UID {
  259. return nil, nil
  260. }
  261. return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil
  262. }
  263. func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) {
  264. gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
  265. if err != nil {
  266. return false, err
  267. }
  268. if controllerRef.Kind != expectedKind {
  269. return false, nil
  270. }
  271. for _, group := range expectedGroups {
  272. if group == gv.Group {
  273. return true, nil
  274. }
  275. }
  276. return false, nil
  277. }
  278. func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
  279. defer utilruntime.HandleCrash()
  280. defer dc.queue.ShutDown()
  281. klog.Infof("Starting disruption controller")
  282. defer klog.Infof("Shutting down disruption controller")
  283. if !cache.WaitForNamedCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
  284. return
  285. }
  286. if dc.kubeClient != nil {
  287. klog.Infof("Sending events to api server.")
  288. dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
  289. } else {
  290. klog.Infof("No api server defined - no events will be sent to API server.")
  291. }
  292. go wait.Until(dc.worker, time.Second, stopCh)
  293. go wait.Until(dc.recheckWorker, time.Second, stopCh)
  294. <-stopCh
  295. }
  296. func (dc *DisruptionController) addDb(obj interface{}) {
  297. pdb := obj.(*policy.PodDisruptionBudget)
  298. klog.V(4).Infof("add DB %q", pdb.Name)
  299. dc.enqueuePdb(pdb)
  300. }
  301. func (dc *DisruptionController) updateDb(old, cur interface{}) {
  302. // TODO(mml) ignore updates where 'old' is equivalent to 'cur'.
  303. pdb := cur.(*policy.PodDisruptionBudget)
  304. klog.V(4).Infof("update DB %q", pdb.Name)
  305. dc.enqueuePdb(pdb)
  306. }
  307. func (dc *DisruptionController) removeDb(obj interface{}) {
  308. pdb, ok := obj.(*policy.PodDisruptionBudget)
  309. if !ok {
  310. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  311. if !ok {
  312. klog.Errorf("Couldn't get object from tombstone %+v", obj)
  313. return
  314. }
  315. pdb, ok = tombstone.Obj.(*policy.PodDisruptionBudget)
  316. if !ok {
  317. klog.Errorf("Tombstone contained object that is not a pdb %+v", obj)
  318. return
  319. }
  320. }
  321. klog.V(4).Infof("remove DB %q", pdb.Name)
  322. dc.enqueuePdb(pdb)
  323. }
  324. func (dc *DisruptionController) addPod(obj interface{}) {
  325. pod := obj.(*v1.Pod)
  326. klog.V(4).Infof("addPod called on pod %q", pod.Name)
  327. pdb := dc.getPdbForPod(pod)
  328. if pdb == nil {
  329. klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
  330. return
  331. }
  332. klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
  333. dc.enqueuePdb(pdb)
  334. }
  335. func (dc *DisruptionController) updatePod(old, cur interface{}) {
  336. pod := cur.(*v1.Pod)
  337. klog.V(4).Infof("updatePod called on pod %q", pod.Name)
  338. pdb := dc.getPdbForPod(pod)
  339. if pdb == nil {
  340. klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
  341. return
  342. }
  343. klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
  344. dc.enqueuePdb(pdb)
  345. }
  346. func (dc *DisruptionController) deletePod(obj interface{}) {
  347. pod, ok := obj.(*v1.Pod)
  348. // When a delete is dropped, the relist will notice a pod in the store not
  349. // in the list, leading to the insertion of a tombstone object which contains
  350. // the deleted key/value. Note that this value might be stale. If the pod
  351. // changed labels the new ReplicaSet will not be woken up till the periodic
  352. // resync.
  353. if !ok {
  354. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  355. if !ok {
  356. klog.Errorf("Couldn't get object from tombstone %+v", obj)
  357. return
  358. }
  359. pod, ok = tombstone.Obj.(*v1.Pod)
  360. if !ok {
  361. klog.Errorf("Tombstone contained object that is not a pod %+v", obj)
  362. return
  363. }
  364. }
  365. klog.V(4).Infof("deletePod called on pod %q", pod.Name)
  366. pdb := dc.getPdbForPod(pod)
  367. if pdb == nil {
  368. klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
  369. return
  370. }
  371. klog.V(4).Infof("deletePod %q -> PDB %q", pod.Name, pdb.Name)
  372. dc.enqueuePdb(pdb)
  373. }
  374. func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) {
  375. key, err := controller.KeyFunc(pdb)
  376. if err != nil {
  377. klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err)
  378. return
  379. }
  380. dc.queue.Add(key)
  381. }
  382. func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBudget, delay time.Duration) {
  383. key, err := controller.KeyFunc(pdb)
  384. if err != nil {
  385. klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err)
  386. return
  387. }
  388. dc.recheckQueue.AddAfter(key, delay)
  389. }
  390. func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget {
  391. // GetPodPodDisruptionBudgets returns an error only if no
  392. // PodDisruptionBudgets are found. We don't return that as an error to the
  393. // caller.
  394. pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod)
  395. if err != nil {
  396. klog.V(4).Infof("No PodDisruptionBudgets found for pod %v, PodDisruptionBudget controller will avoid syncing.", pod.Name)
  397. return nil
  398. }
  399. if len(pdbs) > 1 {
  400. msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets. Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name)
  401. klog.Warning(msg)
  402. dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
  403. }
  404. return pdbs[0]
  405. }
  406. // This function returns pods using the PodDisruptionBudget object.
  407. // IMPORTANT NOTE : the returned pods should NOT be modified.
  408. func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) {
  409. sel, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
  410. if sel.Empty() {
  411. return []*v1.Pod{}, nil
  412. }
  413. if err != nil {
  414. return []*v1.Pod{}, err
  415. }
  416. pods, err := dc.podLister.Pods(pdb.Namespace).List(sel)
  417. if err != nil {
  418. return []*v1.Pod{}, err
  419. }
  420. return pods, nil
  421. }
  422. func (dc *DisruptionController) worker() {
  423. for dc.processNextWorkItem() {
  424. }
  425. }
  426. func (dc *DisruptionController) processNextWorkItem() bool {
  427. dKey, quit := dc.queue.Get()
  428. if quit {
  429. return false
  430. }
  431. defer dc.queue.Done(dKey)
  432. err := dc.sync(dKey.(string))
  433. if err == nil {
  434. dc.queue.Forget(dKey)
  435. return true
  436. }
  437. utilruntime.HandleError(fmt.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", dKey.(string), err))
  438. dc.queue.AddRateLimited(dKey)
  439. return true
  440. }
  441. func (dc *DisruptionController) recheckWorker() {
  442. for dc.processNextRecheckWorkItem() {
  443. }
  444. }
  445. func (dc *DisruptionController) processNextRecheckWorkItem() bool {
  446. dKey, quit := dc.recheckQueue.Get()
  447. if quit {
  448. return false
  449. }
  450. defer dc.recheckQueue.Done(dKey)
  451. dc.queue.AddRateLimited(dKey)
  452. return true
  453. }
  454. func (dc *DisruptionController) sync(key string) error {
  455. startTime := time.Now()
  456. defer func() {
  457. klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime))
  458. }()
  459. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  460. if err != nil {
  461. return err
  462. }
  463. pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name)
  464. if errors.IsNotFound(err) {
  465. klog.V(4).Infof("PodDisruptionBudget %q has been deleted", key)
  466. return nil
  467. }
  468. if err != nil {
  469. return err
  470. }
  471. err = dc.trySync(pdb)
  472. // If the reason for failure was a conflict, then allow this PDB update to be
  473. // requeued without triggering the failSafe logic.
  474. if errors.IsConflict(err) {
  475. return err
  476. }
  477. if err != nil {
  478. klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
  479. return dc.failSafe(pdb)
  480. }
  481. return nil
  482. }
  483. func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
  484. pods, err := dc.getPodsForPdb(pdb)
  485. if err != nil {
  486. dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err)
  487. return err
  488. }
  489. if len(pods) == 0 {
  490. dc.recorder.Eventf(pdb, v1.EventTypeNormal, "NoPods", "No matching pods found")
  491. }
  492. expectedCount, desiredHealthy, err := dc.getExpectedPodCount(pdb, pods)
  493. if err != nil {
  494. dc.recorder.Eventf(pdb, v1.EventTypeWarning, "CalculateExpectedPodCountFailed", "Failed to calculate the number of expected pods: %v", err)
  495. return err
  496. }
  497. currentTime := time.Now()
  498. disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime)
  499. currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
  500. err = dc.updatePdbStatus(pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
  501. if err == nil && recheckTime != nil {
  502. // There is always at most one PDB waiting with a particular name in the queue,
  503. // and each PDB in the queue is associated with the lowest timestamp
  504. // that was supplied when a PDB with that name was added.
  505. dc.enqueuePdbForRecheck(pdb, recheckTime.Sub(currentTime))
  506. }
  507. return err
  508. }
  509. func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, err error) {
  510. err = nil
  511. // TODO(davidopp): consider making the way expectedCount and rules about
  512. // permitted controller configurations (specifically, considering it an error
  513. // if a pod covered by a PDB has 0 controllers or > 1 controller) should be
  514. // handled the same way for integer and percentage minAvailable
  515. if pdb.Spec.MaxUnavailable != nil {
  516. expectedCount, err = dc.getExpectedScale(pdb, pods)
  517. if err != nil {
  518. return
  519. }
  520. var maxUnavailable int
  521. maxUnavailable, err = intstr.GetValueFromIntOrPercent(pdb.Spec.MaxUnavailable, int(expectedCount), true)
  522. if err != nil {
  523. return
  524. }
  525. desiredHealthy = expectedCount - int32(maxUnavailable)
  526. if desiredHealthy < 0 {
  527. desiredHealthy = 0
  528. }
  529. } else if pdb.Spec.MinAvailable != nil {
  530. if pdb.Spec.MinAvailable.Type == intstr.Int {
  531. desiredHealthy = pdb.Spec.MinAvailable.IntVal
  532. expectedCount = int32(len(pods))
  533. } else if pdb.Spec.MinAvailable.Type == intstr.String {
  534. expectedCount, err = dc.getExpectedScale(pdb, pods)
  535. if err != nil {
  536. return
  537. }
  538. var minAvailable int
  539. minAvailable, err = intstr.GetValueFromIntOrPercent(pdb.Spec.MinAvailable, int(expectedCount), true)
  540. if err != nil {
  541. return
  542. }
  543. desiredHealthy = int32(minAvailable)
  544. }
  545. }
  546. return
  547. }
  548. func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, err error) {
  549. // When the user specifies a fraction of pods that must be available, we
  550. // use as the fraction's denominator
  551. // SUM_{all c in C} scale(c)
  552. // where C is the union of C_p1, C_p2, ..., C_pN
  553. // and each C_pi is the set of controllers controlling the pod pi
  554. // k8s only defines what will happens when 0 or 1 controllers control a
  555. // given pod. We explicitly exclude the 0 controllers case here, and we
  556. // report an error if we find a pod with more than 1 controller. Thus in
  557. // practice each C_pi is a set of exactly 1 controller.
  558. // A mapping from controllers to their scale.
  559. controllerScale := map[types.UID]int32{}
  560. // 1. Find the controller for each pod. If any pod has 0 controllers,
  561. // that's an error. With ControllerRef, a pod can only have 1 controller.
  562. for _, pod := range pods {
  563. controllerRef := metav1.GetControllerOf(pod)
  564. if controllerRef == nil {
  565. err = fmt.Errorf("found no controller ref for pod %q", pod.Name)
  566. dc.recorder.Event(pdb, v1.EventTypeWarning, "NoControllerRef", err.Error())
  567. return
  568. }
  569. // If we already know the scale of the controller there is no need to do anything.
  570. if _, found := controllerScale[controllerRef.UID]; found {
  571. continue
  572. }
  573. // Check all the supported controllers to find the desired scale.
  574. foundController := false
  575. for _, finder := range dc.finders() {
  576. var controllerNScale *controllerAndScale
  577. controllerNScale, err = finder(controllerRef, pod.Namespace)
  578. if err != nil {
  579. return
  580. }
  581. if controllerNScale != nil {
  582. controllerScale[controllerNScale.UID] = controllerNScale.scale
  583. foundController = true
  584. break
  585. }
  586. }
  587. if !foundController {
  588. err = fmt.Errorf("found no controllers for pod %q", pod.Name)
  589. dc.recorder.Event(pdb, v1.EventTypeWarning, "NoControllers", err.Error())
  590. return
  591. }
  592. }
  593. // 2. Add up all the controllers.
  594. expectedCount = 0
  595. for _, count := range controllerScale {
  596. expectedCount += count
  597. }
  598. return
  599. }
  600. func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, currentTime time.Time) (currentHealthy int32) {
  601. Pod:
  602. for _, pod := range pods {
  603. // Pod is being deleted.
  604. if pod.DeletionTimestamp != nil {
  605. continue
  606. }
  607. // Pod is expected to be deleted soon.
  608. if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) {
  609. continue
  610. }
  611. if podutil.IsPodReady(pod) {
  612. currentHealthy++
  613. continue Pod
  614. }
  615. }
  616. return
  617. }
  618. // Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted
  619. // or not-deleted at all items. Also returns an information when this check should be repeated.
  620. func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) {
  621. disruptedPods := pdb.Status.DisruptedPods
  622. result := make(map[string]metav1.Time)
  623. var recheckTime *time.Time
  624. if disruptedPods == nil {
  625. return result, recheckTime
  626. }
  627. for _, pod := range pods {
  628. if pod.DeletionTimestamp != nil {
  629. // Already being deleted.
  630. continue
  631. }
  632. disruptionTime, found := disruptedPods[pod.Name]
  633. if !found {
  634. // Pod not on the list.
  635. continue
  636. }
  637. expectedDeletion := disruptionTime.Time.Add(DeletionTimeout)
  638. if expectedDeletion.Before(currentTime) {
  639. klog.V(1).Infof("Pod %s/%s was expected to be deleted at %s but it wasn't, updating pdb %s/%s",
  640. pod.Namespace, pod.Name, disruptionTime.String(), pdb.Namespace, pdb.Name)
  641. dc.recorder.Eventf(pod, v1.EventTypeWarning, "NotDeleted", "Pod was expected by PDB %s/%s to be deleted but it wasn't",
  642. pdb.Namespace, pdb.Namespace)
  643. } else {
  644. if recheckTime == nil || expectedDeletion.Before(*recheckTime) {
  645. recheckTime = &expectedDeletion
  646. }
  647. result[pod.Name] = disruptionTime
  648. }
  649. }
  650. return result, recheckTime
  651. }
  652. // failSafe is an attempt to at least update the DisruptionsAllowed field to
  653. // 0 if everything else has failed. This is one place we
  654. // implement the "fail open" part of the design since if we manage to update
  655. // this field correctly, we will prevent the /evict handler from approving an
  656. // eviction when it may be unsafe to do so.
  657. func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget) error {
  658. newPdb := pdb.DeepCopy()
  659. newPdb.Status.DisruptionsAllowed = 0
  660. return dc.getUpdater()(newPdb)
  661. }
  662. func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32,
  663. disruptedPods map[string]metav1.Time) error {
  664. // We require expectedCount to be > 0 so that PDBs which currently match no
  665. // pods are in a safe state when their first pods appear but this controller
  666. // has not updated their status yet. This isn't the only race, but it's a
  667. // common one that's easy to detect.
  668. disruptionsAllowed := currentHealthy - desiredHealthy
  669. if expectedCount <= 0 || disruptionsAllowed <= 0 {
  670. disruptionsAllowed = 0
  671. }
  672. if pdb.Status.CurrentHealthy == currentHealthy &&
  673. pdb.Status.DesiredHealthy == desiredHealthy &&
  674. pdb.Status.ExpectedPods == expectedCount &&
  675. pdb.Status.DisruptionsAllowed == disruptionsAllowed &&
  676. apiequality.Semantic.DeepEqual(pdb.Status.DisruptedPods, disruptedPods) &&
  677. pdb.Status.ObservedGeneration == pdb.Generation {
  678. return nil
  679. }
  680. newPdb := pdb.DeepCopy()
  681. newPdb.Status = policy.PodDisruptionBudgetStatus{
  682. CurrentHealthy: currentHealthy,
  683. DesiredHealthy: desiredHealthy,
  684. ExpectedPods: expectedCount,
  685. DisruptionsAllowed: disruptionsAllowed,
  686. DisruptedPods: disruptedPods,
  687. ObservedGeneration: pdb.Generation,
  688. }
  689. return dc.getUpdater()(newPdb)
  690. }
  691. func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error {
  692. // If this update fails, don't retry it. Allow the failure to get handled &
  693. // retried in `processNextWorkItem()`.
  694. _, err := dc.kubeClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(context.TODO(), pdb, metav1.UpdateOptions{})
  695. return err
  696. }