123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package bootstrap
- import (
- "context"
- "fmt"
- "time"
- v1 "k8s.io/api/core/v1"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- coreinformers "k8s.io/client-go/informers/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/util/workqueue"
- bootstrapapi "k8s.io/cluster-bootstrap/token/api"
- bootstrapsecretutil "k8s.io/cluster-bootstrap/util/secrets"
- "k8s.io/component-base/metrics/prometheus/ratelimiter"
- "k8s.io/klog"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/pkg/controller"
- )
- // TokenCleanerOptions contains options for the TokenCleaner
- type TokenCleanerOptions struct {
- // TokenSecretNamespace string is the namespace for token Secrets.
- TokenSecretNamespace string
- // SecretResync is the time.Duration at which to fully re-list secrets.
- // If zero, re-list will be delayed as long as possible
- SecretResync time.Duration
- }
- // DefaultTokenCleanerOptions returns a set of default options for creating a
- // TokenCleaner
- func DefaultTokenCleanerOptions() TokenCleanerOptions {
- return TokenCleanerOptions{
- TokenSecretNamespace: api.NamespaceSystem,
- }
- }
- // TokenCleaner is a controller that deletes expired tokens
- type TokenCleaner struct {
- tokenSecretNamespace string
- client clientset.Interface
- // secretLister is able to list/get secrets and is populated by the shared informer passed to NewTokenCleaner.
- secretLister corelisters.SecretLister
- // secretSynced returns true if the secret shared informer has been synced at least once.
- secretSynced cache.InformerSynced
- queue workqueue.RateLimitingInterface
- }
- // NewTokenCleaner returns a new *NewTokenCleaner.
- func NewTokenCleaner(cl clientset.Interface, secrets coreinformers.SecretInformer, options TokenCleanerOptions) (*TokenCleaner, error) {
- e := &TokenCleaner{
- client: cl,
- secretLister: secrets.Lister(),
- secretSynced: secrets.Informer().HasSynced,
- tokenSecretNamespace: options.TokenSecretNamespace,
- queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "token_cleaner"),
- }
- if cl.CoreV1().RESTClient().GetRateLimiter() != nil {
- if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("token_cleaner", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil {
- return nil, err
- }
- }
- secrets.Informer().AddEventHandlerWithResyncPeriod(
- cache.FilteringResourceEventHandler{
- FilterFunc: func(obj interface{}) bool {
- switch t := obj.(type) {
- case *v1.Secret:
- return t.Type == bootstrapapi.SecretTypeBootstrapToken && t.Namespace == e.tokenSecretNamespace
- default:
- utilruntime.HandleError(fmt.Errorf("object passed to %T that is not expected: %T", e, obj))
- return false
- }
- },
- Handler: cache.ResourceEventHandlerFuncs{
- AddFunc: e.enqueueSecrets,
- UpdateFunc: func(oldSecret, newSecret interface{}) { e.enqueueSecrets(newSecret) },
- },
- },
- options.SecretResync,
- )
- return e, nil
- }
- // Run runs controller loops and returns when they are done
- func (tc *TokenCleaner) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- defer tc.queue.ShutDown()
- klog.Infof("Starting token cleaner controller")
- defer klog.Infof("Shutting down token cleaner controller")
- if !cache.WaitForNamedCacheSync("token_cleaner", stopCh, tc.secretSynced) {
- return
- }
- go wait.Until(tc.worker, 10*time.Second, stopCh)
- <-stopCh
- }
- func (tc *TokenCleaner) enqueueSecrets(obj interface{}) {
- key, err := controller.KeyFunc(obj)
- if err != nil {
- utilruntime.HandleError(err)
- return
- }
- tc.queue.Add(key)
- }
- // worker runs a thread that dequeues secrets, handles them, and marks them done.
- func (tc *TokenCleaner) worker() {
- for tc.processNextWorkItem() {
- }
- }
- // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
- func (tc *TokenCleaner) processNextWorkItem() bool {
- key, quit := tc.queue.Get()
- if quit {
- return false
- }
- defer tc.queue.Done(key)
- if err := tc.syncFunc(key.(string)); err != nil {
- tc.queue.AddRateLimited(key)
- utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", key, err))
- return true
- }
- tc.queue.Forget(key)
- return true
- }
- func (tc *TokenCleaner) syncFunc(key string) error {
- startTime := time.Now()
- defer func() {
- klog.V(4).Infof("Finished syncing secret %q (%v)", key, time.Since(startTime))
- }()
- namespace, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- return err
- }
- ret, err := tc.secretLister.Secrets(namespace).Get(name)
- if apierrors.IsNotFound(err) {
- klog.V(3).Infof("secret has been deleted: %v", key)
- return nil
- }
- if err != nil {
- return err
- }
- if ret.Type == bootstrapapi.SecretTypeBootstrapToken {
- tc.evalSecret(ret)
- }
- return nil
- }
- func (tc *TokenCleaner) evalSecret(o interface{}) {
- secret := o.(*v1.Secret)
- ttl, alreadyExpired := bootstrapsecretutil.GetExpiration(secret, time.Now())
- if alreadyExpired {
- klog.V(3).Infof("Deleting expired secret %s/%s", secret.Namespace, secret.Name)
- var options *metav1.DeleteOptions
- if len(secret.UID) > 0 {
- options = &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &secret.UID}}
- }
- err := tc.client.CoreV1().Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, options)
- // NotFound isn't a real error (it's already been deleted)
- // Conflict isn't a real error (the UID precondition failed)
- if err != nil && !apierrors.IsConflict(err) && !apierrors.IsNotFound(err) {
- klog.V(3).Infof("Error deleting Secret: %v", err)
- }
- } else if ttl > 0 {
- key, err := controller.KeyFunc(o)
- if err != nil {
- utilruntime.HandleError(err)
- return
- }
- tc.queue.AddAfter(key, ttl)
- }
- }
|