disruption.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package disruption
  14. import (
  15. "fmt"
  16. "time"
  17. apps "k8s.io/api/apps/v1beta1"
  18. "k8s.io/api/core/v1"
  19. "k8s.io/api/extensions/v1beta1"
  20. policy "k8s.io/api/policy/v1beta1"
  21. apiequality "k8s.io/apimachinery/pkg/api/equality"
  22. "k8s.io/apimachinery/pkg/api/errors"
  23. apimeta "k8s.io/apimachinery/pkg/api/meta"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/runtime/schema"
  26. "k8s.io/apimachinery/pkg/types"
  27. "k8s.io/apimachinery/pkg/util/intstr"
  28. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. appsv1informers "k8s.io/client-go/informers/apps/v1"
  31. coreinformers "k8s.io/client-go/informers/core/v1"
  32. policyinformers "k8s.io/client-go/informers/policy/v1beta1"
  33. clientset "k8s.io/client-go/kubernetes"
  34. "k8s.io/client-go/kubernetes/scheme"
  35. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  36. policyclientset "k8s.io/client-go/kubernetes/typed/policy/v1beta1"
  37. appsv1listers "k8s.io/client-go/listers/apps/v1"
  38. corelisters "k8s.io/client-go/listers/core/v1"
  39. policylisters "k8s.io/client-go/listers/policy/v1beta1"
  40. scaleclient "k8s.io/client-go/scale"
  41. "k8s.io/client-go/tools/cache"
  42. "k8s.io/client-go/tools/record"
  43. "k8s.io/client-go/util/workqueue"
  44. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  45. "k8s.io/kubernetes/pkg/controller"
  46. "k8s.io/klog"
  47. )
  48. const statusUpdateRetries = 2
  49. // DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
  50. // to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
  51. // If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
  52. // all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
  53. // pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
  54. // be more than enough.
  55. // If the controller is running on a different node it is important that the two nodes have synced
  56. // clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
  57. // protection against unwanted pod disruptions.
  58. const DeletionTimeout = 2 * 60 * time.Second
  59. type updater func(*policy.PodDisruptionBudget) error
  60. type DisruptionController struct {
  61. kubeClient clientset.Interface
  62. mapper apimeta.RESTMapper
  63. scaleNamespacer scaleclient.ScalesGetter
  64. pdbLister policylisters.PodDisruptionBudgetLister
  65. pdbListerSynced cache.InformerSynced
  66. podLister corelisters.PodLister
  67. podListerSynced cache.InformerSynced
  68. rcLister corelisters.ReplicationControllerLister
  69. rcListerSynced cache.InformerSynced
  70. rsLister appsv1listers.ReplicaSetLister
  71. rsListerSynced cache.InformerSynced
  72. dLister appsv1listers.DeploymentLister
  73. dListerSynced cache.InformerSynced
  74. ssLister appsv1listers.StatefulSetLister
  75. ssListerSynced cache.InformerSynced
  76. // PodDisruptionBudget keys that need to be synced.
  77. queue workqueue.RateLimitingInterface
  78. recheckQueue workqueue.DelayingInterface
  79. broadcaster record.EventBroadcaster
  80. recorder record.EventRecorder
  81. getUpdater func() updater
  82. }
  83. // controllerAndScale is used to return (controller, scale) pairs from the
  84. // controller finder functions.
  85. type controllerAndScale struct {
  86. types.UID
  87. scale int32
  88. }
  89. // podControllerFinder is a function type that maps a pod to a list of
  90. // controllers and their scale.
  91. type podControllerFinder func(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error)
  92. func NewDisruptionController(
  93. podInformer coreinformers.PodInformer,
  94. pdbInformer policyinformers.PodDisruptionBudgetInformer,
  95. rcInformer coreinformers.ReplicationControllerInformer,
  96. rsInformer appsv1informers.ReplicaSetInformer,
  97. dInformer appsv1informers.DeploymentInformer,
  98. ssInformer appsv1informers.StatefulSetInformer,
  99. kubeClient clientset.Interface,
  100. restMapper apimeta.RESTMapper,
  101. scaleNamespacer scaleclient.ScalesGetter,
  102. ) *DisruptionController {
  103. dc := &DisruptionController{
  104. kubeClient: kubeClient,
  105. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
  106. recheckQueue: workqueue.NewNamedDelayingQueue("disruption_recheck"),
  107. broadcaster: record.NewBroadcaster(),
  108. }
  109. dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
  110. dc.getUpdater = func() updater { return dc.writePdbStatus }
  111. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  112. AddFunc: dc.addPod,
  113. UpdateFunc: dc.updatePod,
  114. DeleteFunc: dc.deletePod,
  115. })
  116. dc.podLister = podInformer.Lister()
  117. dc.podListerSynced = podInformer.Informer().HasSynced
  118. pdbInformer.Informer().AddEventHandlerWithResyncPeriod(
  119. cache.ResourceEventHandlerFuncs{
  120. AddFunc: dc.addDb,
  121. UpdateFunc: dc.updateDb,
  122. DeleteFunc: dc.removeDb,
  123. },
  124. 30*time.Second,
  125. )
  126. dc.pdbLister = pdbInformer.Lister()
  127. dc.pdbListerSynced = pdbInformer.Informer().HasSynced
  128. dc.rcLister = rcInformer.Lister()
  129. dc.rcListerSynced = rcInformer.Informer().HasSynced
  130. dc.rsLister = rsInformer.Lister()
  131. dc.rsListerSynced = rsInformer.Informer().HasSynced
  132. dc.dLister = dInformer.Lister()
  133. dc.dListerSynced = dInformer.Informer().HasSynced
  134. dc.ssLister = ssInformer.Lister()
  135. dc.ssListerSynced = ssInformer.Informer().HasSynced
  136. dc.mapper = restMapper
  137. dc.scaleNamespacer = scaleNamespacer
  138. return dc
  139. }
  140. // The workload resources do implement the scale subresource, so it would
  141. // be possible to only check the scale subresource here. But since there is no
  142. // way to take advantage of listers with scale subresources, we use the workload
  143. // resources directly and only fall back to the scale subresource when needed.
  144. func (dc *DisruptionController) finders() []podControllerFinder {
  145. return []podControllerFinder{dc.getPodReplicationController, dc.getPodDeployment, dc.getPodReplicaSet,
  146. dc.getPodStatefulSet, dc.getScaleController}
  147. }
  148. var (
  149. controllerKindRS = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet")
  150. controllerKindSS = apps.SchemeGroupVersion.WithKind("StatefulSet")
  151. controllerKindRC = v1.SchemeGroupVersion.WithKind("ReplicationController")
  152. controllerKindDep = v1beta1.SchemeGroupVersion.WithKind("Deployment")
  153. )
  154. // getPodReplicaSet finds a replicaset which has no matching deployments.
  155. func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
  156. ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
  157. if !ok || err != nil {
  158. return nil, err
  159. }
  160. rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
  161. if err != nil {
  162. // The only possible error is NotFound, which is ok here.
  163. return nil, nil
  164. }
  165. if rs.UID != controllerRef.UID {
  166. return nil, nil
  167. }
  168. controllerRef = metav1.GetControllerOf(rs)
  169. if controllerRef != nil && controllerRef.Kind == controllerKindDep.Kind {
  170. // Skip RS if it's controlled by a Deployment.
  171. return nil, nil
  172. }
  173. return &controllerAndScale{rs.UID, *(rs.Spec.Replicas)}, nil
  174. }
  175. // getPodStatefulSet returns the statefulset referenced by the provided controllerRef.
  176. func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
  177. ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"})
  178. if !ok || err != nil {
  179. return nil, err
  180. }
  181. ss, err := dc.ssLister.StatefulSets(namespace).Get(controllerRef.Name)
  182. if err != nil {
  183. // The only possible error is NotFound, which is ok here.
  184. return nil, nil
  185. }
  186. if ss.UID != controllerRef.UID {
  187. return nil, nil
  188. }
  189. return &controllerAndScale{ss.UID, *(ss.Spec.Replicas)}, nil
  190. }
  191. // getPodDeployments finds deployments for any replicasets which are being managed by deployments.
  192. func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
  193. ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
  194. if !ok || err != nil {
  195. return nil, err
  196. }
  197. rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
  198. if err != nil {
  199. // The only possible error is NotFound, which is ok here.
  200. return nil, nil
  201. }
  202. if rs.UID != controllerRef.UID {
  203. return nil, nil
  204. }
  205. controllerRef = metav1.GetControllerOf(rs)
  206. if controllerRef == nil {
  207. return nil, nil
  208. }
  209. ok, err = verifyGroupKind(controllerRef, controllerKindDep.Kind, []string{"apps", "extensions"})
  210. if !ok || err != nil {
  211. return nil, err
  212. }
  213. deployment, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
  214. if err != nil {
  215. // The only possible error is NotFound, which is ok here.
  216. return nil, nil
  217. }
  218. if deployment.UID != controllerRef.UID {
  219. return nil, nil
  220. }
  221. return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil
  222. }
  223. func (dc *DisruptionController) getPodReplicationController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
  224. ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""})
  225. if !ok || err != nil {
  226. return nil, err
  227. }
  228. rc, err := dc.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name)
  229. if err != nil {
  230. // The only possible error is NotFound, which is ok here.
  231. return nil, nil
  232. }
  233. if rc.UID != controllerRef.UID {
  234. return nil, nil
  235. }
  236. return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil
  237. }
  238. func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
  239. gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
  240. if err != nil {
  241. return nil, err
  242. }
  243. gk := schema.GroupKind{
  244. Group: gv.Group,
  245. Kind: controllerRef.Kind,
  246. }
  247. mapping, err := dc.mapper.RESTMapping(gk, gv.Version)
  248. if err != nil {
  249. return nil, err
  250. }
  251. gr := mapping.Resource.GroupResource()
  252. scale, err := dc.scaleNamespacer.Scales(namespace).Get(gr, controllerRef.Name)
  253. if err != nil {
  254. if errors.IsNotFound(err) {
  255. return nil, nil
  256. }
  257. return nil, err
  258. }
  259. if scale.UID != controllerRef.UID {
  260. return nil, nil
  261. }
  262. return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil
  263. }
  264. func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) {
  265. gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
  266. if err != nil {
  267. return false, err
  268. }
  269. if controllerRef.Kind != expectedKind {
  270. return false, nil
  271. }
  272. for _, group := range expectedGroups {
  273. if group == gv.Group {
  274. return true, nil
  275. }
  276. }
  277. return false, nil
  278. }
  279. func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
  280. defer utilruntime.HandleCrash()
  281. defer dc.queue.ShutDown()
  282. klog.Infof("Starting disruption controller")
  283. defer klog.Infof("Shutting down disruption controller")
  284. if !controller.WaitForCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
  285. return
  286. }
  287. if dc.kubeClient != nil {
  288. klog.Infof("Sending events to api server.")
  289. dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
  290. } else {
  291. klog.Infof("No api server defined - no events will be sent to API server.")
  292. }
  293. go wait.Until(dc.worker, time.Second, stopCh)
  294. go wait.Until(dc.recheckWorker, time.Second, stopCh)
  295. <-stopCh
  296. }
  297. func (dc *DisruptionController) addDb(obj interface{}) {
  298. pdb := obj.(*policy.PodDisruptionBudget)
  299. klog.V(4).Infof("add DB %q", pdb.Name)
  300. dc.enqueuePdb(pdb)
  301. }
  302. func (dc *DisruptionController) updateDb(old, cur interface{}) {
  303. // TODO(mml) ignore updates where 'old' is equivalent to 'cur'.
  304. pdb := cur.(*policy.PodDisruptionBudget)
  305. klog.V(4).Infof("update DB %q", pdb.Name)
  306. dc.enqueuePdb(pdb)
  307. }
  308. func (dc *DisruptionController) removeDb(obj interface{}) {
  309. pdb := obj.(*policy.PodDisruptionBudget)
  310. klog.V(4).Infof("remove DB %q", pdb.Name)
  311. dc.enqueuePdb(pdb)
  312. }
  313. func (dc *DisruptionController) addPod(obj interface{}) {
  314. pod := obj.(*v1.Pod)
  315. klog.V(4).Infof("addPod called on pod %q", pod.Name)
  316. pdb := dc.getPdbForPod(pod)
  317. if pdb == nil {
  318. klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
  319. return
  320. }
  321. klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
  322. dc.enqueuePdb(pdb)
  323. }
  324. func (dc *DisruptionController) updatePod(old, cur interface{}) {
  325. pod := cur.(*v1.Pod)
  326. klog.V(4).Infof("updatePod called on pod %q", pod.Name)
  327. pdb := dc.getPdbForPod(pod)
  328. if pdb == nil {
  329. klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
  330. return
  331. }
  332. klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
  333. dc.enqueuePdb(pdb)
  334. }
  335. func (dc *DisruptionController) deletePod(obj interface{}) {
  336. pod, ok := obj.(*v1.Pod)
  337. // When a delete is dropped, the relist will notice a pod in the store not
  338. // in the list, leading to the insertion of a tombstone object which contains
  339. // the deleted key/value. Note that this value might be stale. If the pod
  340. // changed labels the new ReplicaSet will not be woken up till the periodic
  341. // resync.
  342. if !ok {
  343. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  344. if !ok {
  345. klog.Errorf("Couldn't get object from tombstone %+v", obj)
  346. return
  347. }
  348. pod, ok = tombstone.Obj.(*v1.Pod)
  349. if !ok {
  350. klog.Errorf("Tombstone contained object that is not a pod %+v", obj)
  351. return
  352. }
  353. }
  354. klog.V(4).Infof("deletePod called on pod %q", pod.Name)
  355. pdb := dc.getPdbForPod(pod)
  356. if pdb == nil {
  357. klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
  358. return
  359. }
  360. klog.V(4).Infof("deletePod %q -> PDB %q", pod.Name, pdb.Name)
  361. dc.enqueuePdb(pdb)
  362. }
  363. func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) {
  364. key, err := controller.KeyFunc(pdb)
  365. if err != nil {
  366. klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err)
  367. return
  368. }
  369. dc.queue.Add(key)
  370. }
  371. func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBudget, delay time.Duration) {
  372. key, err := controller.KeyFunc(pdb)
  373. if err != nil {
  374. klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err)
  375. return
  376. }
  377. dc.recheckQueue.AddAfter(key, delay)
  378. }
  379. func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget {
  380. // GetPodPodDisruptionBudgets returns an error only if no
  381. // PodDisruptionBudgets are found. We don't return that as an error to the
  382. // caller.
  383. pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod)
  384. if err != nil {
  385. klog.V(4).Infof("No PodDisruptionBudgets found for pod %v, PodDisruptionBudget controller will avoid syncing.", pod.Name)
  386. return nil
  387. }
  388. if len(pdbs) > 1 {
  389. msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets. Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name)
  390. klog.Warning(msg)
  391. dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
  392. }
  393. return pdbs[0]
  394. }
  395. // This function returns pods using the PodDisruptionBudget object.
  396. // IMPORTANT NOTE : the returned pods should NOT be modified.
  397. func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) {
  398. sel, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
  399. if sel.Empty() {
  400. return []*v1.Pod{}, nil
  401. }
  402. if err != nil {
  403. return []*v1.Pod{}, err
  404. }
  405. pods, err := dc.podLister.Pods(pdb.Namespace).List(sel)
  406. if err != nil {
  407. return []*v1.Pod{}, err
  408. }
  409. return pods, nil
  410. }
  411. func (dc *DisruptionController) worker() {
  412. for dc.processNextWorkItem() {
  413. }
  414. }
  415. func (dc *DisruptionController) processNextWorkItem() bool {
  416. dKey, quit := dc.queue.Get()
  417. if quit {
  418. return false
  419. }
  420. defer dc.queue.Done(dKey)
  421. err := dc.sync(dKey.(string))
  422. if err == nil {
  423. dc.queue.Forget(dKey)
  424. return true
  425. }
  426. utilruntime.HandleError(fmt.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", dKey.(string), err))
  427. dc.queue.AddRateLimited(dKey)
  428. return true
  429. }
  430. func (dc *DisruptionController) recheckWorker() {
  431. for dc.processNextRecheckWorkItem() {
  432. }
  433. }
  434. func (dc *DisruptionController) processNextRecheckWorkItem() bool {
  435. dKey, quit := dc.recheckQueue.Get()
  436. if quit {
  437. return false
  438. }
  439. defer dc.recheckQueue.Done(dKey)
  440. dc.queue.AddRateLimited(dKey)
  441. return true
  442. }
  443. func (dc *DisruptionController) sync(key string) error {
  444. startTime := time.Now()
  445. defer func() {
  446. klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime))
  447. }()
  448. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  449. if err != nil {
  450. return err
  451. }
  452. pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name)
  453. if errors.IsNotFound(err) {
  454. klog.V(4).Infof("PodDisruptionBudget %q has been deleted", key)
  455. return nil
  456. }
  457. if err != nil {
  458. return err
  459. }
  460. if err := dc.trySync(pdb); err != nil {
  461. klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
  462. return dc.failSafe(pdb)
  463. }
  464. return nil
  465. }
  466. func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
  467. pods, err := dc.getPodsForPdb(pdb)
  468. if err != nil {
  469. dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err)
  470. return err
  471. }
  472. if len(pods) == 0 {
  473. dc.recorder.Eventf(pdb, v1.EventTypeNormal, "NoPods", "No matching pods found")
  474. }
  475. expectedCount, desiredHealthy, err := dc.getExpectedPodCount(pdb, pods)
  476. if err != nil {
  477. dc.recorder.Eventf(pdb, v1.EventTypeWarning, "CalculateExpectedPodCountFailed", "Failed to calculate the number of expected pods: %v", err)
  478. return err
  479. }
  480. currentTime := time.Now()
  481. disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime)
  482. currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
  483. err = dc.updatePdbStatus(pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
  484. if err == nil && recheckTime != nil {
  485. // There is always at most one PDB waiting with a particular name in the queue,
  486. // and each PDB in the queue is associated with the lowest timestamp
  487. // that was supplied when a PDB with that name was added.
  488. dc.enqueuePdbForRecheck(pdb, recheckTime.Sub(currentTime))
  489. }
  490. return err
  491. }
  492. func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, err error) {
  493. err = nil
  494. // TODO(davidopp): consider making the way expectedCount and rules about
  495. // permitted controller configurations (specifically, considering it an error
  496. // if a pod covered by a PDB has 0 controllers or > 1 controller) should be
  497. // handled the same way for integer and percentage minAvailable
  498. if pdb.Spec.MaxUnavailable != nil {
  499. expectedCount, err = dc.getExpectedScale(pdb, pods)
  500. if err != nil {
  501. return
  502. }
  503. var maxUnavailable int
  504. maxUnavailable, err = intstr.GetValueFromIntOrPercent(pdb.Spec.MaxUnavailable, int(expectedCount), true)
  505. if err != nil {
  506. return
  507. }
  508. desiredHealthy = expectedCount - int32(maxUnavailable)
  509. if desiredHealthy < 0 {
  510. desiredHealthy = 0
  511. }
  512. } else if pdb.Spec.MinAvailable != nil {
  513. if pdb.Spec.MinAvailable.Type == intstr.Int {
  514. desiredHealthy = pdb.Spec.MinAvailable.IntVal
  515. expectedCount = int32(len(pods))
  516. } else if pdb.Spec.MinAvailable.Type == intstr.String {
  517. expectedCount, err = dc.getExpectedScale(pdb, pods)
  518. if err != nil {
  519. return
  520. }
  521. var minAvailable int
  522. minAvailable, err = intstr.GetValueFromIntOrPercent(pdb.Spec.MinAvailable, int(expectedCount), true)
  523. if err != nil {
  524. return
  525. }
  526. desiredHealthy = int32(minAvailable)
  527. }
  528. }
  529. return
  530. }
  531. func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, err error) {
  532. // When the user specifies a fraction of pods that must be available, we
  533. // use as the fraction's denominator
  534. // SUM_{all c in C} scale(c)
  535. // where C is the union of C_p1, C_p2, ..., C_pN
  536. // and each C_pi is the set of controllers controlling the pod pi
  537. // k8s only defines what will happens when 0 or 1 controllers control a
  538. // given pod. We explicitly exclude the 0 controllers case here, and we
  539. // report an error if we find a pod with more than 1 controller. Thus in
  540. // practice each C_pi is a set of exactly 1 controller.
  541. // A mapping from controllers to their scale.
  542. controllerScale := map[types.UID]int32{}
  543. // 1. Find the controller for each pod. If any pod has 0 controllers,
  544. // that's an error. With ControllerRef, a pod can only have 1 controller.
  545. for _, pod := range pods {
  546. controllerRef := metav1.GetControllerOf(pod)
  547. if controllerRef == nil {
  548. err = fmt.Errorf("found no controller ref for pod %q", pod.Name)
  549. dc.recorder.Event(pdb, v1.EventTypeWarning, "NoControllerRef", err.Error())
  550. return
  551. }
  552. // If we already know the scale of the controller there is no need to do anything.
  553. if _, found := controllerScale[controllerRef.UID]; found {
  554. continue
  555. }
  556. // Check all the supported controllers to find the desired scale.
  557. foundController := false
  558. for _, finder := range dc.finders() {
  559. var controllerNScale *controllerAndScale
  560. controllerNScale, err = finder(controllerRef, pod.Namespace)
  561. if err != nil {
  562. return
  563. }
  564. if controllerNScale != nil {
  565. controllerScale[controllerNScale.UID] = controllerNScale.scale
  566. foundController = true
  567. break
  568. }
  569. }
  570. if !foundController {
  571. err = fmt.Errorf("found no controllers for pod %q", pod.Name)
  572. dc.recorder.Event(pdb, v1.EventTypeWarning, "NoControllers", err.Error())
  573. return
  574. }
  575. }
  576. // 2. Add up all the controllers.
  577. expectedCount = 0
  578. for _, count := range controllerScale {
  579. expectedCount += count
  580. }
  581. return
  582. }
  583. func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, currentTime time.Time) (currentHealthy int32) {
  584. Pod:
  585. for _, pod := range pods {
  586. // Pod is being deleted.
  587. if pod.DeletionTimestamp != nil {
  588. continue
  589. }
  590. // Pod is expected to be deleted soon.
  591. if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) {
  592. continue
  593. }
  594. if podutil.IsPodReady(pod) {
  595. currentHealthy++
  596. continue Pod
  597. }
  598. }
  599. return
  600. }
  601. // Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted
  602. // or not-deleted at all items. Also returns an information when this check should be repeated.
  603. func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) {
  604. disruptedPods := pdb.Status.DisruptedPods
  605. result := make(map[string]metav1.Time)
  606. var recheckTime *time.Time
  607. if disruptedPods == nil || len(disruptedPods) == 0 {
  608. return result, recheckTime
  609. }
  610. for _, pod := range pods {
  611. if pod.DeletionTimestamp != nil {
  612. // Already being deleted.
  613. continue
  614. }
  615. disruptionTime, found := disruptedPods[pod.Name]
  616. if !found {
  617. // Pod not on the list.
  618. continue
  619. }
  620. expectedDeletion := disruptionTime.Time.Add(DeletionTimeout)
  621. if expectedDeletion.Before(currentTime) {
  622. klog.V(1).Infof("Pod %s/%s was expected to be deleted at %s but it wasn't, updating pdb %s/%s",
  623. pod.Namespace, pod.Name, disruptionTime.String(), pdb.Namespace, pdb.Name)
  624. dc.recorder.Eventf(pod, v1.EventTypeWarning, "NotDeleted", "Pod was expected by PDB %s/%s to be deleted but it wasn't",
  625. pdb.Namespace, pdb.Namespace)
  626. } else {
  627. if recheckTime == nil || expectedDeletion.Before(*recheckTime) {
  628. recheckTime = &expectedDeletion
  629. }
  630. result[pod.Name] = disruptionTime
  631. }
  632. }
  633. return result, recheckTime
  634. }
  635. // failSafe is an attempt to at least update the PodDisruptionsAllowed field to
  636. // 0 if everything else has failed. This is one place we
  637. // implement the "fail open" part of the design since if we manage to update
  638. // this field correctly, we will prevent the /evict handler from approving an
  639. // eviction when it may be unsafe to do so.
  640. func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget) error {
  641. newPdb := pdb.DeepCopy()
  642. newPdb.Status.PodDisruptionsAllowed = 0
  643. return dc.getUpdater()(newPdb)
  644. }
  645. func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32,
  646. disruptedPods map[string]metav1.Time) error {
  647. // We require expectedCount to be > 0 so that PDBs which currently match no
  648. // pods are in a safe state when their first pods appear but this controller
  649. // has not updated their status yet. This isn't the only race, but it's a
  650. // common one that's easy to detect.
  651. disruptionsAllowed := currentHealthy - desiredHealthy
  652. if expectedCount <= 0 || disruptionsAllowed <= 0 {
  653. disruptionsAllowed = 0
  654. }
  655. if pdb.Status.CurrentHealthy == currentHealthy &&
  656. pdb.Status.DesiredHealthy == desiredHealthy &&
  657. pdb.Status.ExpectedPods == expectedCount &&
  658. pdb.Status.PodDisruptionsAllowed == disruptionsAllowed &&
  659. apiequality.Semantic.DeepEqual(pdb.Status.DisruptedPods, disruptedPods) &&
  660. pdb.Status.ObservedGeneration == pdb.Generation {
  661. return nil
  662. }
  663. newPdb := pdb.DeepCopy()
  664. newPdb.Status = policy.PodDisruptionBudgetStatus{
  665. CurrentHealthy: currentHealthy,
  666. DesiredHealthy: desiredHealthy,
  667. ExpectedPods: expectedCount,
  668. PodDisruptionsAllowed: disruptionsAllowed,
  669. DisruptedPods: disruptedPods,
  670. ObservedGeneration: pdb.Generation,
  671. }
  672. return dc.getUpdater()(newPdb)
  673. }
  674. // refresh tries to re-GET the given PDB. If there are any errors, it just
  675. // returns the old PDB. Intended to be used in a retry loop where it runs a
  676. // bounded number of times.
  677. func refresh(pdbClient policyclientset.PodDisruptionBudgetInterface, pdb *policy.PodDisruptionBudget) *policy.PodDisruptionBudget {
  678. newPdb, err := pdbClient.Get(pdb.Name, metav1.GetOptions{})
  679. if err == nil {
  680. return newPdb
  681. }
  682. return pdb
  683. }
  684. func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error {
  685. pdbClient := dc.kubeClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace)
  686. st := pdb.Status
  687. var err error
  688. for i, pdb := 0, pdb; i < statusUpdateRetries; i, pdb = i+1, refresh(pdbClient, pdb) {
  689. pdb.Status = st
  690. if _, err = pdbClient.UpdateStatus(pdb); err == nil {
  691. break
  692. }
  693. }
  694. return err
  695. }