123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package resourcequota
- import (
- "fmt"
- "reflect"
- "sync"
- "time"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- apiequality "k8s.io/apimachinery/pkg/api/equality"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/runtime/schema"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/discovery"
- coreinformers "k8s.io/client-go/informers/core/v1"
- corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/kubernetes/pkg/controller"
- quota "k8s.io/kubernetes/pkg/quota/v1"
- )
- // NamespacedResourcesFunc knows how to discover namespaced resources.
- type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)
- // ReplenishmentFunc is a signal that a resource changed in specified namespace
- // that may require quota to be recalculated.
- type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string)
- // ResourceQuotaControllerOptions holds options for creating a quota controller
- type ResourceQuotaControllerOptions struct {
- // Must have authority to list all quotas, and update quota status
- QuotaClient corev1client.ResourceQuotasGetter
- // Shared informer for resource quotas
- ResourceQuotaInformer coreinformers.ResourceQuotaInformer
- // Controls full recalculation of quota usage
- ResyncPeriod controller.ResyncPeriodFunc
- // Maintains evaluators that know how to calculate usage for group resource
- Registry quota.Registry
- // Discover list of supported resources on the server.
- DiscoveryFunc NamespacedResourcesFunc
- // A function that returns the list of resources to ignore
- IgnoredResourcesFunc func() map[schema.GroupResource]struct{}
- // InformersStarted knows if informers were started.
- InformersStarted <-chan struct{}
- // InformerFactory interfaces with informers.
- InformerFactory controller.InformerFactory
- // Controls full resync of objects monitored for replenishment.
- ReplenishmentResyncPeriod controller.ResyncPeriodFunc
- }
- // ResourceQuotaController is responsible for tracking quota usage status in the system
- type ResourceQuotaController struct {
- // Must have authority to list all resources in the system, and update quota status
- rqClient corev1client.ResourceQuotasGetter
- // A lister/getter of resource quota objects
- rqLister corelisters.ResourceQuotaLister
- // A list of functions that return true when their caches have synced
- informerSyncedFuncs []cache.InformerSynced
- // ResourceQuota objects that need to be synchronized
- queue workqueue.RateLimitingInterface
- // missingUsageQueue holds objects that are missing the initial usage information
- missingUsageQueue workqueue.RateLimitingInterface
- // To allow injection of syncUsage for testing.
- syncHandler func(key string) error
- // function that controls full recalculation of quota usage
- resyncPeriod controller.ResyncPeriodFunc
- // knows how to calculate usage
- registry quota.Registry
- // knows how to monitor all the resources tracked by quota and trigger replenishment
- quotaMonitor *QuotaMonitor
- // controls the workers that process quotas
- // this lock is acquired to control write access to the monitors and ensures that all
- // monitors are synced before the controller can process quotas.
- workerLock sync.RWMutex
- }
- // NewResourceQuotaController creates a quota controller with specified options
- func NewResourceQuotaController(options *ResourceQuotaControllerOptions) (*ResourceQuotaController, error) {
- // build the resource quota controller
- rq := &ResourceQuotaController{
- rqClient: options.QuotaClient,
- rqLister: options.ResourceQuotaInformer.Lister(),
- informerSyncedFuncs: []cache.InformerSynced{options.ResourceQuotaInformer.Informer().HasSynced},
- queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
- missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
- resyncPeriod: options.ResyncPeriod,
- registry: options.Registry,
- }
- // set the synchronization handler
- rq.syncHandler = rq.syncResourceQuotaFromKey
- options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
- cache.ResourceEventHandlerFuncs{
- AddFunc: rq.addQuota,
- UpdateFunc: func(old, cur interface{}) {
- // We are only interested in observing updates to quota.spec to drive updates to quota.status.
- // We ignore all updates to quota.Status because they are all driven by this controller.
- // IMPORTANT:
- // We do not use this function to queue up a full quota recalculation. To do so, would require
- // us to enqueue all quota.Status updates, and since quota.Status updates involve additional queries
- // that cannot be backed by a cache and result in a full query of a namespace's content, we do not
- // want to pay the price on spurious status updates. As a result, we have a separate routine that is
- // responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
- oldResourceQuota := old.(*v1.ResourceQuota)
- curResourceQuota := cur.(*v1.ResourceQuota)
- if quota.V1Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
- return
- }
- rq.addQuota(curResourceQuota)
- },
- // This will enter the sync loop and no-op, because the controller has been deleted from the store.
- // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
- // way of achieving this is by performing a `stop` operation on the controller.
- DeleteFunc: rq.enqueueResourceQuota,
- },
- rq.resyncPeriod(),
- )
- if options.DiscoveryFunc != nil {
- qm := &QuotaMonitor{
- informersStarted: options.InformersStarted,
- informerFactory: options.InformerFactory,
- ignoredResources: options.IgnoredResourcesFunc(),
- resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
- resyncPeriod: options.ReplenishmentResyncPeriod,
- replenishmentFunc: rq.replenishQuota,
- registry: rq.registry,
- }
- rq.quotaMonitor = qm
- // do initial quota monitor setup. If we have a discovery failure here, it's ok. We'll discover more resources when a later sync happens.
- resources, err := GetQuotableResources(options.DiscoveryFunc)
- if discovery.IsGroupDiscoveryFailedError(err) {
- utilruntime.HandleError(fmt.Errorf("initial discovery check failure, continuing and counting on future sync update: %v", err))
- } else if err != nil {
- return nil, err
- }
- if err = qm.SyncMonitors(resources); err != nil {
- utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
- }
- // only start quota once all informers synced
- rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
- }
- return rq, nil
- }
- // enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
- func (rq *ResourceQuotaController) enqueueAll() {
- defer klog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
- rqs, err := rq.rqLister.List(labels.Everything())
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err))
- return
- }
- for i := range rqs {
- key, err := controller.KeyFunc(rqs[i])
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", rqs[i], err))
- continue
- }
- rq.queue.Add(key)
- }
- }
- // obj could be an *v1.ResourceQuota, or a DeletionFinalStateUnknown marker item.
- func (rq *ResourceQuotaController) enqueueResourceQuota(obj interface{}) {
- key, err := controller.KeyFunc(obj)
- if err != nil {
- klog.Errorf("Couldn't get key for object %+v: %v", obj, err)
- return
- }
- rq.queue.Add(key)
- }
- func (rq *ResourceQuotaController) addQuota(obj interface{}) {
- key, err := controller.KeyFunc(obj)
- if err != nil {
- klog.Errorf("Couldn't get key for object %+v: %v", obj, err)
- return
- }
- resourceQuota := obj.(*v1.ResourceQuota)
- // if we declared an intent that is not yet captured in status (prioritize it)
- if !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) {
- rq.missingUsageQueue.Add(key)
- return
- }
- // if we declared a constraint that has no usage (which this controller can calculate, prioritize it)
- for constraint := range resourceQuota.Status.Hard {
- if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
- matchedResources := []v1.ResourceName{v1.ResourceName(constraint)}
- for _, evaluator := range rq.registry.List() {
- if intersection := evaluator.MatchingResources(matchedResources); len(intersection) > 0 {
- rq.missingUsageQueue.Add(key)
- return
- }
- }
- }
- }
- // no special priority, go in normal recalc queue
- rq.queue.Add(key)
- }
- // worker runs a worker thread that just dequeues items, processes them, and marks them done.
- func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
- workFunc := func() bool {
- key, quit := queue.Get()
- if quit {
- return true
- }
- defer queue.Done(key)
- rq.workerLock.RLock()
- defer rq.workerLock.RUnlock()
- err := rq.syncHandler(key.(string))
- if err == nil {
- queue.Forget(key)
- return false
- }
- utilruntime.HandleError(err)
- queue.AddRateLimited(key)
- return false
- }
- return func() {
- for {
- if quit := workFunc(); quit {
- klog.Infof("resource quota controller worker shutting down")
- return
- }
- }
- }
- }
- // Run begins quota controller using the specified number of workers
- func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- defer rq.queue.ShutDown()
- klog.Infof("Starting resource quota controller")
- defer klog.Infof("Shutting down resource quota controller")
- if rq.quotaMonitor != nil {
- go rq.quotaMonitor.Run(stopCh)
- }
- if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
- return
- }
- // the workers that chug through the quota calculation backlog
- for i := 0; i < workers; i++ {
- go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
- go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
- }
- // the timer for how often we do a full recalculation across all quotas
- go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
- <-stopCh
- }
- // syncResourceQuotaFromKey syncs a quota key
- func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) {
- startTime := time.Now()
- defer func() {
- klog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Since(startTime))
- }()
- namespace, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- return err
- }
- quota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
- if errors.IsNotFound(err) {
- klog.Infof("Resource quota has been deleted %v", key)
- return nil
- }
- if err != nil {
- klog.Infof("Unable to retrieve resource quota %v from store: %v", key, err)
- return err
- }
- return rq.syncResourceQuota(quota)
- }
- // syncResourceQuota runs a complete sync of resource quota status across all known kinds
- func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQuota) (err error) {
- // quota is dirty if any part of spec hard limits differs from the status hard limits
- statusLimitsDirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
- // dirty tracks if the usage status differs from the previous sync,
- // if so, we send a new usage with latest status
- // if this is our first sync, it will be dirty by default, since we need track usage
- dirty := statusLimitsDirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil
- used := v1.ResourceList{}
- if resourceQuota.Status.Used != nil {
- used = quota.Add(v1.ResourceList{}, resourceQuota.Status.Used)
- }
- hardLimits := quota.Add(v1.ResourceList{}, resourceQuota.Spec.Hard)
- errors := []error{}
- newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry, resourceQuota.Spec.ScopeSelector)
- if err != nil {
- // if err is non-nil, remember it to return, but continue updating status with any resources in newUsage
- errors = append(errors, err)
- }
- for key, value := range newUsage {
- used[key] = value
- }
- // ensure set of used values match those that have hard constraints
- hardResources := quota.ResourceNames(hardLimits)
- used = quota.Mask(used, hardResources)
- // Create a usage object that is based on the quota resource version that will handle updates
- // by default, we preserve the past usage observation, and set hard to the current spec
- usage := resourceQuota.DeepCopy()
- usage.Status = v1.ResourceQuotaStatus{
- Hard: hardLimits,
- Used: used,
- }
- dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)
- // there was a change observed by this controller that requires we update quota
- if dirty {
- _, err = rq.rqClient.ResourceQuotas(usage.Namespace).UpdateStatus(usage)
- if err != nil {
- errors = append(errors, err)
- }
- }
- return utilerrors.NewAggregate(errors)
- }
- // replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
- func (rq *ResourceQuotaController) replenishQuota(groupResource schema.GroupResource, namespace string) {
- // check if the quota controller can evaluate this groupResource, if not, ignore it altogether...
- evaluator := rq.registry.Get(groupResource)
- if evaluator == nil {
- return
- }
- // check if this namespace even has a quota...
- resourceQuotas, err := rq.rqLister.ResourceQuotas(namespace).List(labels.Everything())
- if errors.IsNotFound(err) {
- utilruntime.HandleError(fmt.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod()))
- return
- }
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("error checking to see if namespace %s has any ResourceQuota associated with it: %v", namespace, err))
- return
- }
- if len(resourceQuotas) == 0 {
- return
- }
- // only queue those quotas that are tracking a resource associated with this kind.
- for i := range resourceQuotas {
- resourceQuota := resourceQuotas[i]
- resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
- if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {
- // TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
- rq.enqueueResourceQuota(resourceQuota)
- }
- }
- }
- // Sync periodically resyncs the controller when new resources are observed from discovery.
- func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, period time.Duration, stopCh <-chan struct{}) {
- // Something has changed, so track the new state and perform a sync.
- oldResources := make(map[schema.GroupVersionResource]struct{})
- wait.Until(func() {
- // Get the current resource list from discovery.
- newResources, err := GetQuotableResources(discoveryFunc)
- if err != nil {
- utilruntime.HandleError(err)
- if discovery.IsGroupDiscoveryFailedError(err) && len(newResources) > 0 {
- // In partial discovery cases, don't remove any existing informers, just add new ones
- for k, v := range oldResources {
- newResources[k] = v
- }
- } else {
- // short circuit in non-discovery error cases or if discovery returned zero resources
- return
- }
- }
- // Decide whether discovery has reported a change.
- if reflect.DeepEqual(oldResources, newResources) {
- klog.V(4).Infof("no resource updates from discovery, skipping resource quota sync")
- return
- }
- // Ensure workers are paused to avoid processing events before informers
- // have resynced.
- rq.workerLock.Lock()
- defer rq.workerLock.Unlock()
- // Something has changed, so track the new state and perform a sync.
- if klog.V(2) {
- klog.Infof("syncing resource quota controller with updated resources from discovery: %s", printDiff(oldResources, newResources))
- }
- // Perform the monitor resync and wait for controllers to report cache sync.
- if err := rq.resyncMonitors(newResources); err != nil {
- utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
- return
- }
- // wait for caches to fill for a while (our sync period).
- // this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
- // informers keep attempting to sync in the background, so retrying doesn't interrupt them.
- // the call to resyncMonitors on the reattempt will no-op for resources that still exist.
- if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) {
- utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
- return
- }
- // success, remember newly synced resources
- oldResources = newResources
- klog.V(2).Infof("synced quota controller")
- }, period, stopCh)
- }
- // printDiff returns a human-readable summary of what resources were added and removed
- func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
- removed := sets.NewString()
- for oldResource := range oldResources {
- if _, ok := newResources[oldResource]; !ok {
- removed.Insert(fmt.Sprintf("%+v", oldResource))
- }
- }
- added := sets.NewString()
- for newResource := range newResources {
- if _, ok := oldResources[newResource]; !ok {
- added.Insert(fmt.Sprintf("%+v", newResource))
- }
- }
- return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
- }
- // waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
- func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
- stopChWithTimeout := make(chan struct{})
- go func() {
- defer close(stopChWithTimeout)
- select {
- case <-stopCh:
- case <-time.After(timeout):
- }
- }()
- return stopChWithTimeout
- }
- // resyncMonitors starts or stops quota monitors as needed to ensure that all
- // (and only) those resources present in the map are monitored.
- func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
- if rq.quotaMonitor == nil {
- return nil
- }
- if err := rq.quotaMonitor.SyncMonitors(resources); err != nil {
- return err
- }
- rq.quotaMonitor.StartMonitors()
- return nil
- }
- // GetQuotableResources returns all resources that the quota system should recognize.
- // It requires a resource supports the following verbs: 'create','list','delete'
- // This function may return both results and an error. If that happens, it means that the discovery calls were only
- // partially successful. A decision about whether to proceed or not is left to the caller.
- func GetQuotableResources(discoveryFunc NamespacedResourcesFunc) (map[schema.GroupVersionResource]struct{}, error) {
- possibleResources, discoveryErr := discoveryFunc()
- if discoveryErr != nil && len(possibleResources) == 0 {
- return nil, fmt.Errorf("failed to discover resources: %v", discoveryErr)
- }
- quotableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"create", "list", "watch", "delete"}}, possibleResources)
- quotableGroupVersionResources, err := discovery.GroupVersionResources(quotableResources)
- if err != nil {
- return nil, fmt.Errorf("Failed to parse resources: %v", err)
- }
- // return the original discovery error (if any) in addition to the list
- return quotableGroupVersionResources, discoveryErr
- }
|