pvc_protection_controller.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pvcprotection
  14. import (
  15. "fmt"
  16. "time"
  17. "k8s.io/api/core/v1"
  18. apierrs "k8s.io/apimachinery/pkg/api/errors"
  19. "k8s.io/apimachinery/pkg/labels"
  20. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. coreinformers "k8s.io/client-go/informers/core/v1"
  23. clientset "k8s.io/client-go/kubernetes"
  24. corelisters "k8s.io/client-go/listers/core/v1"
  25. "k8s.io/client-go/tools/cache"
  26. "k8s.io/client-go/util/workqueue"
  27. "k8s.io/klog"
  28. "k8s.io/kubernetes/pkg/controller"
  29. "k8s.io/kubernetes/pkg/controller/volume/protectionutil"
  30. "k8s.io/kubernetes/pkg/util/metrics"
  31. "k8s.io/kubernetes/pkg/util/slice"
  32. volumeutil "k8s.io/kubernetes/pkg/volume/util"
  33. )
  34. // Controller is controller that removes PVCProtectionFinalizer
  35. // from PVCs that are used by no pods.
  36. type Controller struct {
  37. client clientset.Interface
  38. pvcLister corelisters.PersistentVolumeClaimLister
  39. pvcListerSynced cache.InformerSynced
  40. podLister corelisters.PodLister
  41. podListerSynced cache.InformerSynced
  42. queue workqueue.RateLimitingInterface
  43. // allows overriding of StorageObjectInUseProtection feature Enabled/Disabled for testing
  44. storageObjectInUseProtectionEnabled bool
  45. }
  46. // NewPVCProtectionController returns a new instance of PVCProtectionController.
  47. func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface, storageObjectInUseProtectionFeatureEnabled bool) *Controller {
  48. e := &Controller{
  49. client: cl,
  50. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcprotection"),
  51. storageObjectInUseProtectionEnabled: storageObjectInUseProtectionFeatureEnabled,
  52. }
  53. if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil {
  54. metrics.RegisterMetricAndTrackRateLimiterUsage("persistentvolumeclaim_protection_controller", cl.CoreV1().RESTClient().GetRateLimiter())
  55. }
  56. e.pvcLister = pvcInformer.Lister()
  57. e.pvcListerSynced = pvcInformer.Informer().HasSynced
  58. pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  59. AddFunc: e.pvcAddedUpdated,
  60. UpdateFunc: func(old, new interface{}) {
  61. e.pvcAddedUpdated(new)
  62. },
  63. })
  64. e.podLister = podInformer.Lister()
  65. e.podListerSynced = podInformer.Informer().HasSynced
  66. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  67. AddFunc: func(obj interface{}) {
  68. e.podAddedDeletedUpdated(obj, false)
  69. },
  70. DeleteFunc: func(obj interface{}) {
  71. e.podAddedDeletedUpdated(obj, true)
  72. },
  73. UpdateFunc: func(old, new interface{}) {
  74. e.podAddedDeletedUpdated(new, false)
  75. },
  76. })
  77. return e
  78. }
  79. // Run runs the controller goroutines.
  80. func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
  81. defer utilruntime.HandleCrash()
  82. defer c.queue.ShutDown()
  83. klog.Infof("Starting PVC protection controller")
  84. defer klog.Infof("Shutting down PVC protection controller")
  85. if !controller.WaitForCacheSync("PVC protection", stopCh, c.pvcListerSynced, c.podListerSynced) {
  86. return
  87. }
  88. for i := 0; i < workers; i++ {
  89. go wait.Until(c.runWorker, time.Second, stopCh)
  90. }
  91. <-stopCh
  92. }
  93. func (c *Controller) runWorker() {
  94. for c.processNextWorkItem() {
  95. }
  96. }
  97. // processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
  98. func (c *Controller) processNextWorkItem() bool {
  99. pvcKey, quit := c.queue.Get()
  100. if quit {
  101. return false
  102. }
  103. defer c.queue.Done(pvcKey)
  104. pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey.(string))
  105. if err != nil {
  106. utilruntime.HandleError(fmt.Errorf("Error parsing PVC key %q: %v", pvcKey, err))
  107. return true
  108. }
  109. err = c.processPVC(pvcNamespace, pvcName)
  110. if err == nil {
  111. c.queue.Forget(pvcKey)
  112. return true
  113. }
  114. utilruntime.HandleError(fmt.Errorf("PVC %v failed with : %v", pvcKey, err))
  115. c.queue.AddRateLimited(pvcKey)
  116. return true
  117. }
  118. func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
  119. klog.V(4).Infof("Processing PVC %s/%s", pvcNamespace, pvcName)
  120. startTime := time.Now()
  121. defer func() {
  122. klog.V(4).Infof("Finished processing PVC %s/%s (%v)", pvcNamespace, pvcName, time.Since(startTime))
  123. }()
  124. pvc, err := c.pvcLister.PersistentVolumeClaims(pvcNamespace).Get(pvcName)
  125. if apierrs.IsNotFound(err) {
  126. klog.V(4).Infof("PVC %s/%s not found, ignoring", pvcNamespace, pvcName)
  127. return nil
  128. }
  129. if err != nil {
  130. return err
  131. }
  132. if protectionutil.IsDeletionCandidate(pvc, volumeutil.PVCProtectionFinalizer) {
  133. // PVC should be deleted. Check if it's used and remove finalizer if
  134. // it's not.
  135. isUsed, err := c.isBeingUsed(pvc)
  136. if err != nil {
  137. return err
  138. }
  139. if !isUsed {
  140. return c.removeFinalizer(pvc)
  141. }
  142. }
  143. if protectionutil.NeedToAddFinalizer(pvc, volumeutil.PVCProtectionFinalizer) {
  144. // PVC is not being deleted -> it should have the finalizer. The
  145. // finalizer should be added by admission plugin, this is just to add
  146. // the finalizer to old PVCs that were created before the admission
  147. // plugin was enabled.
  148. return c.addFinalizer(pvc)
  149. }
  150. return nil
  151. }
  152. func (c *Controller) addFinalizer(pvc *v1.PersistentVolumeClaim) error {
  153. // Skip adding Finalizer in case the StorageObjectInUseProtection feature is not enabled
  154. if !c.storageObjectInUseProtectionEnabled {
  155. return nil
  156. }
  157. claimClone := pvc.DeepCopy()
  158. claimClone.ObjectMeta.Finalizers = append(claimClone.ObjectMeta.Finalizers, volumeutil.PVCProtectionFinalizer)
  159. _, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(claimClone)
  160. if err != nil {
  161. klog.V(3).Infof("Error adding protection finalizer to PVC %s/%s: %v", pvc.Namespace, pvc.Name, err)
  162. return err
  163. }
  164. klog.V(3).Infof("Added protection finalizer to PVC %s/%s", pvc.Namespace, pvc.Name)
  165. return nil
  166. }
  167. func (c *Controller) removeFinalizer(pvc *v1.PersistentVolumeClaim) error {
  168. claimClone := pvc.DeepCopy()
  169. claimClone.ObjectMeta.Finalizers = slice.RemoveString(claimClone.ObjectMeta.Finalizers, volumeutil.PVCProtectionFinalizer, nil)
  170. _, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(claimClone)
  171. if err != nil {
  172. klog.V(3).Infof("Error removing protection finalizer from PVC %s/%s: %v", pvc.Namespace, pvc.Name, err)
  173. return err
  174. }
  175. klog.V(3).Infof("Removed protection finalizer from PVC %s/%s", pvc.Namespace, pvc.Name)
  176. return nil
  177. }
  178. func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
  179. pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything())
  180. if err != nil {
  181. return false, err
  182. }
  183. for _, pod := range pods {
  184. if pod.Spec.NodeName == "" {
  185. // This pod is not scheduled. We have a predicated in scheduler that
  186. // prevents scheduling pods with deletion timestamp, so we can be
  187. // pretty sure it won't be scheduled in parallel to this check.
  188. // Therefore this pod does not block the PVC from deletion.
  189. klog.V(4).Infof("Skipping unscheduled pod %s when checking PVC %s/%s", pod.Name, pvc.Namespace, pvc.Name)
  190. continue
  191. }
  192. for _, volume := range pod.Spec.Volumes {
  193. if volume.PersistentVolumeClaim == nil {
  194. continue
  195. }
  196. if volume.PersistentVolumeClaim.ClaimName == pvc.Name {
  197. klog.V(2).Infof("Keeping PVC %s/%s, it is used by pod %s/%s", pvc.Namespace, pvc.Name, pod.Namespace, pod.Name)
  198. return true, nil
  199. }
  200. }
  201. }
  202. klog.V(3).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name)
  203. return false, nil
  204. }
  205. // pvcAddedUpdated reacts to pvc added/updated/deleted events
  206. func (c *Controller) pvcAddedUpdated(obj interface{}) {
  207. pvc, ok := obj.(*v1.PersistentVolumeClaim)
  208. if !ok {
  209. utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", obj))
  210. return
  211. }
  212. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
  213. if err != nil {
  214. utilruntime.HandleError(fmt.Errorf("Couldn't get key for Persistent Volume Claim %#v: %v", pvc, err))
  215. return
  216. }
  217. klog.V(4).Infof("Got event on PVC %s", key)
  218. if protectionutil.NeedToAddFinalizer(pvc, volumeutil.PVCProtectionFinalizer) || protectionutil.IsDeletionCandidate(pvc, volumeutil.PVCProtectionFinalizer) {
  219. c.queue.Add(key)
  220. }
  221. }
  222. // podAddedDeletedUpdated reacts to Pod events
  223. func (c *Controller) podAddedDeletedUpdated(obj interface{}, deleted bool) {
  224. pod, ok := obj.(*v1.Pod)
  225. if !ok {
  226. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  227. if !ok {
  228. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  229. return
  230. }
  231. pod, ok = tombstone.Obj.(*v1.Pod)
  232. if !ok {
  233. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod %#v", obj))
  234. return
  235. }
  236. }
  237. // Filter out pods that can't help us to remove a finalizer on PVC
  238. if !deleted && !volumeutil.IsPodTerminated(pod, pod.Status) && pod.Spec.NodeName != "" {
  239. return
  240. }
  241. klog.V(4).Infof("Got event on pod %s/%s", pod.Namespace, pod.Name)
  242. // Enqueue all PVCs that the pod uses
  243. for _, volume := range pod.Spec.Volumes {
  244. if volume.PersistentVolumeClaim != nil {
  245. c.queue.Add(pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName)
  246. }
  247. }
  248. }