123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- package namespace
- import (
- "fmt"
- "time"
- "golang.org/x/time/rate"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- coreinformers "k8s.io/client-go/informers/core/v1"
- clientset "k8s.io/client-go/kubernetes"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/metadata"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/component-base/metrics/prometheus/ratelimiter"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/namespace/deletion"
- "k8s.io/klog"
- )
- const (
-
-
-
-
-
-
- namespaceDeletionGracePeriod = 5 * time.Second
- )
- type NamespaceController struct {
-
- lister corelisters.NamespaceLister
-
- listerSynced cache.InformerSynced
-
- queue workqueue.RateLimitingInterface
-
- namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
- }
- func NewNamespaceController(
- kubeClient clientset.Interface,
- metadataClient metadata.Interface,
- discoverResourcesFn func() ([]*metav1.APIResourceList, error),
- namespaceInformer coreinformers.NamespaceInformer,
- resyncPeriod time.Duration,
- finalizerToken v1.FinalizerName) *NamespaceController {
-
- namespaceController := &NamespaceController{
- queue: workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"),
- namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken),
- }
- if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
- ratelimiter.RegisterMetricAndTrackRateLimiterUsage("namespace_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
- }
-
- namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- namespace := obj.(*v1.Namespace)
- namespaceController.enqueueNamespace(namespace)
- },
- UpdateFunc: func(oldObj, newObj interface{}) {
- namespace := newObj.(*v1.Namespace)
- namespaceController.enqueueNamespace(namespace)
- },
- },
- resyncPeriod,
- )
- namespaceController.lister = namespaceInformer.Lister()
- namespaceController.listerSynced = namespaceInformer.Informer().HasSynced
- return namespaceController
- }
- func nsControllerRateLimiter() workqueue.RateLimiter {
- return workqueue.NewMaxOfRateLimiter(
-
- workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 60*time.Second),
-
- &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
- )
- }
- func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
- key, err := controller.KeyFunc(obj)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
- return
- }
- namespace := obj.(*v1.Namespace)
-
- if namespace.DeletionTimestamp == nil || namespace.DeletionTimestamp.IsZero() {
- return
- }
-
-
- nm.queue.AddAfter(key, namespaceDeletionGracePeriod)
- }
- func (nm *NamespaceController) worker() {
- workFunc := func() bool {
- key, quit := nm.queue.Get()
- if quit {
- return true
- }
- defer nm.queue.Done(key)
- err := nm.syncNamespaceFromKey(key.(string))
- if err == nil {
-
- nm.queue.Forget(key)
- return false
- }
- if estimate, ok := err.(*deletion.ResourcesRemainingError); ok {
- t := estimate.Estimate/2 + 1
- klog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t)
- nm.queue.AddAfter(key, time.Duration(t)*time.Second)
- } else {
-
- nm.queue.AddRateLimited(key)
- utilruntime.HandleError(fmt.Errorf("deletion of namespace %v failed: %v", key, err))
- }
- return false
- }
- for {
- quit := workFunc()
- if quit {
- return
- }
- }
- }
- func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
- startTime := time.Now()
- defer func() {
- klog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Since(startTime))
- }()
- namespace, err := nm.lister.Get(key)
- if errors.IsNotFound(err) {
- klog.Infof("Namespace has been deleted %v", key)
- return nil
- }
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err))
- return err
- }
- return nm.namespacedResourcesDeleter.Delete(namespace.Name)
- }
- func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- defer nm.queue.ShutDown()
- klog.Infof("Starting namespace controller")
- defer klog.Infof("Shutting down namespace controller")
- if !cache.WaitForNamedCacheSync("namespace", stopCh, nm.listerSynced) {
- return
- }
- klog.V(5).Info("Starting workers of namespace controller")
- for i := 0; i < workers; i++ {
- go wait.Until(nm.worker, time.Second, stopCh)
- }
- <-stopCh
- }
|