tokencleaner.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package bootstrap
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. v1 "k8s.io/api/core/v1"
  19. apierrors "k8s.io/apimachinery/pkg/api/errors"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. coreinformers "k8s.io/client-go/informers/core/v1"
  24. clientset "k8s.io/client-go/kubernetes"
  25. corelisters "k8s.io/client-go/listers/core/v1"
  26. "k8s.io/client-go/tools/cache"
  27. "k8s.io/client-go/util/workqueue"
  28. bootstrapapi "k8s.io/cluster-bootstrap/token/api"
  29. bootstrapsecretutil "k8s.io/cluster-bootstrap/util/secrets"
  30. "k8s.io/component-base/metrics/prometheus/ratelimiter"
  31. "k8s.io/klog"
  32. api "k8s.io/kubernetes/pkg/apis/core"
  33. "k8s.io/kubernetes/pkg/controller"
  34. )
  35. // TokenCleanerOptions contains options for the TokenCleaner
  36. type TokenCleanerOptions struct {
  37. // TokenSecretNamespace string is the namespace for token Secrets.
  38. TokenSecretNamespace string
  39. // SecretResync is the time.Duration at which to fully re-list secrets.
  40. // If zero, re-list will be delayed as long as possible
  41. SecretResync time.Duration
  42. }
  43. // DefaultTokenCleanerOptions returns a set of default options for creating a
  44. // TokenCleaner
  45. func DefaultTokenCleanerOptions() TokenCleanerOptions {
  46. return TokenCleanerOptions{
  47. TokenSecretNamespace: api.NamespaceSystem,
  48. }
  49. }
  50. // TokenCleaner is a controller that deletes expired tokens
  51. type TokenCleaner struct {
  52. tokenSecretNamespace string
  53. client clientset.Interface
  54. // secretLister is able to list/get secrets and is populated by the shared informer passed to NewTokenCleaner.
  55. secretLister corelisters.SecretLister
  56. // secretSynced returns true if the secret shared informer has been synced at least once.
  57. secretSynced cache.InformerSynced
  58. queue workqueue.RateLimitingInterface
  59. }
  60. // NewTokenCleaner returns a new *NewTokenCleaner.
  61. func NewTokenCleaner(cl clientset.Interface, secrets coreinformers.SecretInformer, options TokenCleanerOptions) (*TokenCleaner, error) {
  62. e := &TokenCleaner{
  63. client: cl,
  64. secretLister: secrets.Lister(),
  65. secretSynced: secrets.Informer().HasSynced,
  66. tokenSecretNamespace: options.TokenSecretNamespace,
  67. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "token_cleaner"),
  68. }
  69. if cl.CoreV1().RESTClient().GetRateLimiter() != nil {
  70. if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("token_cleaner", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil {
  71. return nil, err
  72. }
  73. }
  74. secrets.Informer().AddEventHandlerWithResyncPeriod(
  75. cache.FilteringResourceEventHandler{
  76. FilterFunc: func(obj interface{}) bool {
  77. switch t := obj.(type) {
  78. case *v1.Secret:
  79. return t.Type == bootstrapapi.SecretTypeBootstrapToken && t.Namespace == e.tokenSecretNamespace
  80. default:
  81. utilruntime.HandleError(fmt.Errorf("object passed to %T that is not expected: %T", e, obj))
  82. return false
  83. }
  84. },
  85. Handler: cache.ResourceEventHandlerFuncs{
  86. AddFunc: e.enqueueSecrets,
  87. UpdateFunc: func(oldSecret, newSecret interface{}) { e.enqueueSecrets(newSecret) },
  88. },
  89. },
  90. options.SecretResync,
  91. )
  92. return e, nil
  93. }
  94. // Run runs controller loops and returns when they are done
  95. func (tc *TokenCleaner) Run(stopCh <-chan struct{}) {
  96. defer utilruntime.HandleCrash()
  97. defer tc.queue.ShutDown()
  98. klog.Infof("Starting token cleaner controller")
  99. defer klog.Infof("Shutting down token cleaner controller")
  100. if !cache.WaitForNamedCacheSync("token_cleaner", stopCh, tc.secretSynced) {
  101. return
  102. }
  103. go wait.Until(tc.worker, 10*time.Second, stopCh)
  104. <-stopCh
  105. }
  106. func (tc *TokenCleaner) enqueueSecrets(obj interface{}) {
  107. key, err := controller.KeyFunc(obj)
  108. if err != nil {
  109. utilruntime.HandleError(err)
  110. return
  111. }
  112. tc.queue.Add(key)
  113. }
  114. // worker runs a thread that dequeues secrets, handles them, and marks them done.
  115. func (tc *TokenCleaner) worker() {
  116. for tc.processNextWorkItem() {
  117. }
  118. }
  119. // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
  120. func (tc *TokenCleaner) processNextWorkItem() bool {
  121. key, quit := tc.queue.Get()
  122. if quit {
  123. return false
  124. }
  125. defer tc.queue.Done(key)
  126. if err := tc.syncFunc(key.(string)); err != nil {
  127. tc.queue.AddRateLimited(key)
  128. utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", key, err))
  129. return true
  130. }
  131. tc.queue.Forget(key)
  132. return true
  133. }
  134. func (tc *TokenCleaner) syncFunc(key string) error {
  135. startTime := time.Now()
  136. defer func() {
  137. klog.V(4).Infof("Finished syncing secret %q (%v)", key, time.Since(startTime))
  138. }()
  139. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  140. if err != nil {
  141. return err
  142. }
  143. ret, err := tc.secretLister.Secrets(namespace).Get(name)
  144. if apierrors.IsNotFound(err) {
  145. klog.V(3).Infof("secret has been deleted: %v", key)
  146. return nil
  147. }
  148. if err != nil {
  149. return err
  150. }
  151. if ret.Type == bootstrapapi.SecretTypeBootstrapToken {
  152. tc.evalSecret(ret)
  153. }
  154. return nil
  155. }
  156. func (tc *TokenCleaner) evalSecret(o interface{}) {
  157. secret := o.(*v1.Secret)
  158. ttl, alreadyExpired := bootstrapsecretutil.GetExpiration(secret, time.Now())
  159. if alreadyExpired {
  160. klog.V(3).Infof("Deleting expired secret %s/%s", secret.Namespace, secret.Name)
  161. var options *metav1.DeleteOptions
  162. if len(secret.UID) > 0 {
  163. options = &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &secret.UID}}
  164. }
  165. err := tc.client.CoreV1().Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, options)
  166. // NotFound isn't a real error (it's already been deleted)
  167. // Conflict isn't a real error (the UID precondition failed)
  168. if err != nil && !apierrors.IsConflict(err) && !apierrors.IsNotFound(err) {
  169. klog.V(3).Infof("Error deleting Secret: %v", err)
  170. }
  171. } else if ttl > 0 {
  172. key, err := controller.KeyFunc(o)
  173. if err != nil {
  174. utilruntime.HandleError(err)
  175. return
  176. }
  177. tc.queue.AddAfter(key, ttl)
  178. }
  179. }