tokencleaner.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package bootstrap
  14. import (
  15. "fmt"
  16. "time"
  17. "k8s.io/api/core/v1"
  18. apierrors "k8s.io/apimachinery/pkg/api/errors"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. coreinformers "k8s.io/client-go/informers/core/v1"
  23. clientset "k8s.io/client-go/kubernetes"
  24. corelisters "k8s.io/client-go/listers/core/v1"
  25. "k8s.io/client-go/tools/cache"
  26. "k8s.io/client-go/util/workqueue"
  27. bootstrapapi "k8s.io/cluster-bootstrap/token/api"
  28. "k8s.io/klog"
  29. api "k8s.io/kubernetes/pkg/apis/core"
  30. "k8s.io/kubernetes/pkg/controller"
  31. "k8s.io/kubernetes/pkg/util/metrics"
  32. )
  33. // TokenCleanerOptions contains options for the TokenCleaner
  34. type TokenCleanerOptions struct {
  35. // TokenSecretNamespace string is the namespace for token Secrets.
  36. TokenSecretNamespace string
  37. // SecretResync is the time.Duration at which to fully re-list secrets.
  38. // If zero, re-list will be delayed as long as possible
  39. SecretResync time.Duration
  40. }
  41. // DefaultTokenCleanerOptions returns a set of default options for creating a
  42. // TokenCleaner
  43. func DefaultTokenCleanerOptions() TokenCleanerOptions {
  44. return TokenCleanerOptions{
  45. TokenSecretNamespace: api.NamespaceSystem,
  46. }
  47. }
  48. // TokenCleaner is a controller that deletes expired tokens
  49. type TokenCleaner struct {
  50. tokenSecretNamespace string
  51. client clientset.Interface
  52. // secretLister is able to list/get secrets and is populated by the shared informer passed to NewTokenCleaner.
  53. secretLister corelisters.SecretLister
  54. // secretSynced returns true if the secret shared informer has been synced at least once.
  55. secretSynced cache.InformerSynced
  56. queue workqueue.RateLimitingInterface
  57. }
  58. // NewTokenCleaner returns a new *NewTokenCleaner.
  59. func NewTokenCleaner(cl clientset.Interface, secrets coreinformers.SecretInformer, options TokenCleanerOptions) (*TokenCleaner, error) {
  60. e := &TokenCleaner{
  61. client: cl,
  62. secretLister: secrets.Lister(),
  63. secretSynced: secrets.Informer().HasSynced,
  64. tokenSecretNamespace: options.TokenSecretNamespace,
  65. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "token_cleaner"),
  66. }
  67. if cl.CoreV1().RESTClient().GetRateLimiter() != nil {
  68. if err := metrics.RegisterMetricAndTrackRateLimiterUsage("token_cleaner", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  69. return nil, err
  70. }
  71. }
  72. secrets.Informer().AddEventHandlerWithResyncPeriod(
  73. cache.FilteringResourceEventHandler{
  74. FilterFunc: func(obj interface{}) bool {
  75. switch t := obj.(type) {
  76. case *v1.Secret:
  77. return t.Type == bootstrapapi.SecretTypeBootstrapToken && t.Namespace == e.tokenSecretNamespace
  78. default:
  79. utilruntime.HandleError(fmt.Errorf("object passed to %T that is not expected: %T", e, obj))
  80. return false
  81. }
  82. },
  83. Handler: cache.ResourceEventHandlerFuncs{
  84. AddFunc: e.enqueueSecrets,
  85. UpdateFunc: func(oldSecret, newSecret interface{}) { e.enqueueSecrets(newSecret) },
  86. },
  87. },
  88. options.SecretResync,
  89. )
  90. return e, nil
  91. }
  92. // Run runs controller loops and returns when they are done
  93. func (tc *TokenCleaner) Run(stopCh <-chan struct{}) {
  94. defer utilruntime.HandleCrash()
  95. defer tc.queue.ShutDown()
  96. klog.Infof("Starting token cleaner controller")
  97. defer klog.Infof("Shutting down token cleaner controller")
  98. if !controller.WaitForCacheSync("token_cleaner", stopCh, tc.secretSynced) {
  99. return
  100. }
  101. go wait.Until(tc.worker, 10*time.Second, stopCh)
  102. <-stopCh
  103. }
  104. func (tc *TokenCleaner) enqueueSecrets(obj interface{}) {
  105. key, err := controller.KeyFunc(obj)
  106. if err != nil {
  107. utilruntime.HandleError(err)
  108. return
  109. }
  110. tc.queue.Add(key)
  111. }
  112. // worker runs a thread that dequeues secrets, handles them, and marks them done.
  113. func (tc *TokenCleaner) worker() {
  114. for tc.processNextWorkItem() {
  115. }
  116. }
  117. // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
  118. func (tc *TokenCleaner) processNextWorkItem() bool {
  119. key, quit := tc.queue.Get()
  120. if quit {
  121. return false
  122. }
  123. defer tc.queue.Done(key)
  124. if err := tc.syncFunc(key.(string)); err != nil {
  125. tc.queue.AddRateLimited(key)
  126. utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", key, err))
  127. return true
  128. }
  129. tc.queue.Forget(key)
  130. return true
  131. }
  132. func (tc *TokenCleaner) syncFunc(key string) error {
  133. startTime := time.Now()
  134. defer func() {
  135. klog.V(4).Infof("Finished syncing secret %q (%v)", key, time.Since(startTime))
  136. }()
  137. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  138. if err != nil {
  139. return err
  140. }
  141. ret, err := tc.secretLister.Secrets(namespace).Get(name)
  142. if apierrors.IsNotFound(err) {
  143. klog.V(3).Infof("secret has been deleted: %v", key)
  144. return nil
  145. }
  146. if err != nil {
  147. return err
  148. }
  149. if ret.Type == bootstrapapi.SecretTypeBootstrapToken {
  150. tc.evalSecret(ret)
  151. }
  152. return nil
  153. }
  154. func (tc *TokenCleaner) evalSecret(o interface{}) {
  155. secret := o.(*v1.Secret)
  156. if isSecretExpired(secret) {
  157. klog.V(3).Infof("Deleting expired secret %s/%s", secret.Namespace, secret.Name)
  158. var options *metav1.DeleteOptions
  159. if len(secret.UID) > 0 {
  160. options = &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &secret.UID}}
  161. }
  162. err := tc.client.CoreV1().Secrets(secret.Namespace).Delete(secret.Name, options)
  163. // NotFound isn't a real error (it's already been deleted)
  164. // Conflict isn't a real error (the UID precondition failed)
  165. if err != nil && !apierrors.IsConflict(err) && !apierrors.IsNotFound(err) {
  166. klog.V(3).Infof("Error deleting Secret: %v", err)
  167. }
  168. }
  169. }