123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package resourcequota
- import (
- "fmt"
- "sync"
- "time"
- "k8s.io/klog"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/meta"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/util/clock"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/util/workqueue"
- "k8s.io/kubernetes/pkg/controller"
- quota "k8s.io/kubernetes/pkg/quota/v1"
- "k8s.io/kubernetes/pkg/quota/v1/evaluator/core"
- "k8s.io/kubernetes/pkg/quota/v1/generic"
- )
- type eventType int
- func (e eventType) String() string {
- switch e {
- case addEvent:
- return "add"
- case updateEvent:
- return "update"
- case deleteEvent:
- return "delete"
- default:
- return fmt.Sprintf("unknown(%d)", int(e))
- }
- }
- const (
- addEvent eventType = iota
- updateEvent
- deleteEvent
- )
- type event struct {
- eventType eventType
- obj interface{}
- oldObj interface{}
- gvr schema.GroupVersionResource
- }
- type QuotaMonitor struct {
- // each monitor list/watches a resource and determines if we should replenish quota
- monitors monitors
- monitorLock sync.RWMutex
- // informersStarted is closed after after all of the controllers have been initialized and are running.
- // After that it is safe to start them here, before that it is not.
- informersStarted <-chan struct{}
- // stopCh drives shutdown. When a receive from it unblocks, monitors will shut down.
- // This channel is also protected by monitorLock.
- stopCh <-chan struct{}
- // running tracks whether Run() has been called.
- // it is protected by monitorLock.
- running bool
- // monitors are the producer of the resourceChanges queue
- resourceChanges workqueue.RateLimitingInterface
- // interfaces with informers
- informerFactory controller.InformerFactory
- // list of resources to ignore
- ignoredResources map[schema.GroupResource]struct{}
- // The period that should be used to re-sync the monitored resource
- resyncPeriod controller.ResyncPeriodFunc
- // callback to alert that a change may require quota recalculation
- replenishmentFunc ReplenishmentFunc
- // maintains list of evaluators
- registry quota.Registry
- }
- func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory controller.InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor {
- return &QuotaMonitor{
- informersStarted: informersStarted,
- informerFactory: informerFactory,
- ignoredResources: ignoredResources,
- resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
- resyncPeriod: resyncPeriod,
- replenishmentFunc: replenishmentFunc,
- registry: registry,
- }
- }
- // monitor runs a Controller with a local stop channel.
- type monitor struct {
- controller cache.Controller
- // stopCh stops Controller. If stopCh is nil, the monitor is considered to be
- // not yet started.
- stopCh chan struct{}
- }
- // Run is intended to be called in a goroutine. Multiple calls of this is an
- // error.
- func (m *monitor) Run() {
- m.controller.Run(m.stopCh)
- }
- type monitors map[schema.GroupVersionResource]*monitor
- func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cache.Controller, error) {
- // TODO: pass this down
- clock := clock.RealClock{}
- handlers := cache.ResourceEventHandlerFuncs{
- UpdateFunc: func(oldObj, newObj interface{}) {
- // TODO: leaky abstraction! live w/ it for now, but should pass down an update filter func.
- // we only want to queue the updates we care about though as too much noise will overwhelm queue.
- notifyUpdate := false
- switch resource.GroupResource() {
- case schema.GroupResource{Resource: "pods"}:
- oldPod := oldObj.(*v1.Pod)
- newPod := newObj.(*v1.Pod)
- notifyUpdate = core.QuotaV1Pod(oldPod, clock) && !core.QuotaV1Pod(newPod, clock)
- case schema.GroupResource{Resource: "services"}:
- oldService := oldObj.(*v1.Service)
- newService := newObj.(*v1.Service)
- notifyUpdate = core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService)
- }
- if notifyUpdate {
- event := &event{
- eventType: updateEvent,
- obj: newObj,
- oldObj: oldObj,
- gvr: resource,
- }
- qm.resourceChanges.Add(event)
- }
- },
- DeleteFunc: func(obj interface{}) {
- // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
- if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
- obj = deletedFinalStateUnknown.Obj
- }
- event := &event{
- eventType: deleteEvent,
- obj: obj,
- gvr: resource,
- }
- qm.resourceChanges.Add(event)
- },
- }
- shared, err := qm.informerFactory.ForResource(resource)
- if err == nil {
- klog.V(4).Infof("QuotaMonitor using a shared informer for resource %q", resource.String())
- shared.Informer().AddEventHandlerWithResyncPeriod(handlers, qm.resyncPeriod())
- return shared.Informer().GetController(), nil
- }
- klog.V(4).Infof("QuotaMonitor unable to use a shared informer for resource %q: %v", resource.String(), err)
- // TODO: if we can share storage with garbage collector, it may make sense to support other resources
- // until that time, aggregated api servers will have to run their own controller to reconcile their own quota.
- return nil, fmt.Errorf("unable to monitor quota for resource %q", resource.String())
- }
- // SyncMonitors rebuilds the monitor set according to the supplied resources,
- // creating or deleting monitors as necessary. It will return any error
- // encountered, but will make an attempt to create a monitor for each resource
- // instead of immediately exiting on an error. It may be called before or after
- // Run. Monitors are NOT started as part of the sync. To ensure all existing
- // monitors are started, call StartMonitors.
- func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
- qm.monitorLock.Lock()
- defer qm.monitorLock.Unlock()
- toRemove := qm.monitors
- if toRemove == nil {
- toRemove = monitors{}
- }
- current := monitors{}
- errs := []error{}
- kept := 0
- added := 0
- for resource := range resources {
- if _, ok := qm.ignoredResources[resource.GroupResource()]; ok {
- continue
- }
- if m, ok := toRemove[resource]; ok {
- current[resource] = m
- delete(toRemove, resource)
- kept++
- continue
- }
- c, err := qm.controllerFor(resource)
- if err != nil {
- errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
- continue
- }
- // check if we need to create an evaluator for this resource (if none previously registered)
- evaluator := qm.registry.Get(resource.GroupResource())
- if evaluator == nil {
- listerFunc := generic.ListerFuncForResourceFunc(qm.informerFactory.ForResource)
- listResourceFunc := generic.ListResourceUsingListerFunc(listerFunc, resource)
- evaluator = generic.NewObjectCountEvaluator(resource.GroupResource(), listResourceFunc, "")
- qm.registry.Add(evaluator)
- klog.Infof("QuotaMonitor created object count evaluator for %s", resource.GroupResource())
- }
- // track the monitor
- current[resource] = &monitor{controller: c}
- added++
- }
- qm.monitors = current
- for _, monitor := range toRemove {
- if monitor.stopCh != nil {
- close(monitor.stopCh)
- }
- }
- klog.V(4).Infof("quota synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
- // NewAggregate returns nil if errs is 0-length
- return utilerrors.NewAggregate(errs)
- }
- // StartMonitors ensures the current set of monitors are running. Any newly
- // started monitors will also cause shared informers to be started.
- //
- // If called before Run, StartMonitors does nothing (as there is no stop channel
- // to support monitor/informer execution).
- func (qm *QuotaMonitor) StartMonitors() {
- qm.monitorLock.Lock()
- defer qm.monitorLock.Unlock()
- if !qm.running {
- return
- }
- // we're waiting until after the informer start that happens once all the controllers are initialized. This ensures
- // that they don't get unexpected events on their work queues.
- <-qm.informersStarted
- monitors := qm.monitors
- started := 0
- for _, monitor := range monitors {
- if monitor.stopCh == nil {
- monitor.stopCh = make(chan struct{})
- qm.informerFactory.Start(qm.stopCh)
- go monitor.Run()
- started++
- }
- }
- klog.V(4).Infof("QuotaMonitor started %d new monitors, %d currently running", started, len(monitors))
- }
- // IsSynced returns true if any monitors exist AND all those monitors'
- // controllers HasSynced functions return true. This means IsSynced could return
- // true at one time, and then later return false if all monitors were
- // reconstructed.
- func (qm *QuotaMonitor) IsSynced() bool {
- qm.monitorLock.RLock()
- defer qm.monitorLock.RUnlock()
- if len(qm.monitors) == 0 {
- klog.V(4).Info("quota monitor not synced: no monitors")
- return false
- }
- for resource, monitor := range qm.monitors {
- if !monitor.controller.HasSynced() {
- klog.V(4).Infof("quota monitor not synced: %v", resource)
- return false
- }
- }
- return true
- }
- // Run sets the stop channel and starts monitor execution until stopCh is
- // closed. Any running monitors will be stopped before Run returns.
- func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
- klog.Infof("QuotaMonitor running")
- defer klog.Infof("QuotaMonitor stopping")
- // Set up the stop channel.
- qm.monitorLock.Lock()
- qm.stopCh = stopCh
- qm.running = true
- qm.monitorLock.Unlock()
- // Start monitors and begin change processing until the stop channel is
- // closed.
- qm.StartMonitors()
- wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh)
- // Stop any running monitors.
- qm.monitorLock.Lock()
- defer qm.monitorLock.Unlock()
- monitors := qm.monitors
- stopped := 0
- for _, monitor := range monitors {
- if monitor.stopCh != nil {
- stopped++
- close(monitor.stopCh)
- }
- }
- klog.Infof("QuotaMonitor stopped %d of %d monitors", stopped, len(monitors))
- }
- func (qm *QuotaMonitor) runProcessResourceChanges() {
- for qm.processResourceChanges() {
- }
- }
- // Dequeueing an event from resourceChanges to process
- func (qm *QuotaMonitor) processResourceChanges() bool {
- item, quit := qm.resourceChanges.Get()
- if quit {
- return false
- }
- defer qm.resourceChanges.Done(item)
- event, ok := item.(*event)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
- return true
- }
- obj := event.obj
- accessor, err := meta.Accessor(obj)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
- return true
- }
- klog.V(4).Infof("QuotaMonitor process object: %s, namespace %s, name %s, uid %s, event type %v", event.gvr.String(), accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType)
- qm.replenishmentFunc(event.gvr.GroupResource(), accessor.GetNamespace())
- return true
- }
|