resource_quota_controller.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package resourcequota
  14. import (
  15. "context"
  16. "fmt"
  17. "reflect"
  18. "sync"
  19. "time"
  20. "k8s.io/klog"
  21. "k8s.io/api/core/v1"
  22. apiequality "k8s.io/apimachinery/pkg/api/equality"
  23. "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/labels"
  26. "k8s.io/apimachinery/pkg/runtime/schema"
  27. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  28. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  29. "k8s.io/apimachinery/pkg/util/sets"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. "k8s.io/client-go/discovery"
  32. coreinformers "k8s.io/client-go/informers/core/v1"
  33. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  34. corelisters "k8s.io/client-go/listers/core/v1"
  35. "k8s.io/client-go/tools/cache"
  36. "k8s.io/client-go/util/workqueue"
  37. "k8s.io/kubernetes/pkg/controller"
  38. quota "k8s.io/kubernetes/pkg/quota/v1"
  39. )
  40. // NamespacedResourcesFunc knows how to discover namespaced resources.
  41. type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)
  42. // ReplenishmentFunc is a signal that a resource changed in specified namespace
  43. // that may require quota to be recalculated.
  44. type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string)
  45. // ResourceQuotaControllerOptions holds options for creating a quota controller
  46. type ResourceQuotaControllerOptions struct {
  47. // Must have authority to list all quotas, and update quota status
  48. QuotaClient corev1client.ResourceQuotasGetter
  49. // Shared informer for resource quotas
  50. ResourceQuotaInformer coreinformers.ResourceQuotaInformer
  51. // Controls full recalculation of quota usage
  52. ResyncPeriod controller.ResyncPeriodFunc
  53. // Maintains evaluators that know how to calculate usage for group resource
  54. Registry quota.Registry
  55. // Discover list of supported resources on the server.
  56. DiscoveryFunc NamespacedResourcesFunc
  57. // A function that returns the list of resources to ignore
  58. IgnoredResourcesFunc func() map[schema.GroupResource]struct{}
  59. // InformersStarted knows if informers were started.
  60. InformersStarted <-chan struct{}
  61. // InformerFactory interfaces with informers.
  62. InformerFactory controller.InformerFactory
  63. // Controls full resync of objects monitored for replenishment.
  64. ReplenishmentResyncPeriod controller.ResyncPeriodFunc
  65. }
  66. // ResourceQuotaController is responsible for tracking quota usage status in the system
  67. type ResourceQuotaController struct {
  68. // Must have authority to list all resources in the system, and update quota status
  69. rqClient corev1client.ResourceQuotasGetter
  70. // A lister/getter of resource quota objects
  71. rqLister corelisters.ResourceQuotaLister
  72. // A list of functions that return true when their caches have synced
  73. informerSyncedFuncs []cache.InformerSynced
  74. // ResourceQuota objects that need to be synchronized
  75. queue workqueue.RateLimitingInterface
  76. // missingUsageQueue holds objects that are missing the initial usage information
  77. missingUsageQueue workqueue.RateLimitingInterface
  78. // To allow injection of syncUsage for testing.
  79. syncHandler func(key string) error
  80. // function that controls full recalculation of quota usage
  81. resyncPeriod controller.ResyncPeriodFunc
  82. // knows how to calculate usage
  83. registry quota.Registry
  84. // knows how to monitor all the resources tracked by quota and trigger replenishment
  85. quotaMonitor *QuotaMonitor
  86. // controls the workers that process quotas
  87. // this lock is acquired to control write access to the monitors and ensures that all
  88. // monitors are synced before the controller can process quotas.
  89. workerLock sync.RWMutex
  90. }
  91. // NewResourceQuotaController creates a quota controller with specified options
  92. func NewResourceQuotaController(options *ResourceQuotaControllerOptions) (*ResourceQuotaController, error) {
  93. // build the resource quota controller
  94. rq := &ResourceQuotaController{
  95. rqClient: options.QuotaClient,
  96. rqLister: options.ResourceQuotaInformer.Lister(),
  97. informerSyncedFuncs: []cache.InformerSynced{options.ResourceQuotaInformer.Informer().HasSynced},
  98. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
  99. missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
  100. resyncPeriod: options.ResyncPeriod,
  101. registry: options.Registry,
  102. }
  103. // set the synchronization handler
  104. rq.syncHandler = rq.syncResourceQuotaFromKey
  105. options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
  106. cache.ResourceEventHandlerFuncs{
  107. AddFunc: rq.addQuota,
  108. UpdateFunc: func(old, cur interface{}) {
  109. // We are only interested in observing updates to quota.spec to drive updates to quota.status.
  110. // We ignore all updates to quota.Status because they are all driven by this controller.
  111. // IMPORTANT:
  112. // We do not use this function to queue up a full quota recalculation. To do so, would require
  113. // us to enqueue all quota.Status updates, and since quota.Status updates involve additional queries
  114. // that cannot be backed by a cache and result in a full query of a namespace's content, we do not
  115. // want to pay the price on spurious status updates. As a result, we have a separate routine that is
  116. // responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
  117. oldResourceQuota := old.(*v1.ResourceQuota)
  118. curResourceQuota := cur.(*v1.ResourceQuota)
  119. if quota.Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
  120. return
  121. }
  122. rq.addQuota(curResourceQuota)
  123. },
  124. // This will enter the sync loop and no-op, because the controller has been deleted from the store.
  125. // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
  126. // way of achieving this is by performing a `stop` operation on the controller.
  127. DeleteFunc: rq.enqueueResourceQuota,
  128. },
  129. rq.resyncPeriod(),
  130. )
  131. if options.DiscoveryFunc != nil {
  132. qm := &QuotaMonitor{
  133. informersStarted: options.InformersStarted,
  134. informerFactory: options.InformerFactory,
  135. ignoredResources: options.IgnoredResourcesFunc(),
  136. resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
  137. resyncPeriod: options.ReplenishmentResyncPeriod,
  138. replenishmentFunc: rq.replenishQuota,
  139. registry: rq.registry,
  140. }
  141. rq.quotaMonitor = qm
  142. // do initial quota monitor setup. If we have a discovery failure here, it's ok. We'll discover more resources when a later sync happens.
  143. resources, err := GetQuotableResources(options.DiscoveryFunc)
  144. if discovery.IsGroupDiscoveryFailedError(err) {
  145. utilruntime.HandleError(fmt.Errorf("initial discovery check failure, continuing and counting on future sync update: %v", err))
  146. } else if err != nil {
  147. return nil, err
  148. }
  149. if err = qm.SyncMonitors(resources); err != nil {
  150. utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
  151. }
  152. // only start quota once all informers synced
  153. rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
  154. }
  155. return rq, nil
  156. }
  157. // enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
  158. func (rq *ResourceQuotaController) enqueueAll() {
  159. defer klog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
  160. rqs, err := rq.rqLister.List(labels.Everything())
  161. if err != nil {
  162. utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err))
  163. return
  164. }
  165. for i := range rqs {
  166. key, err := controller.KeyFunc(rqs[i])
  167. if err != nil {
  168. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", rqs[i], err))
  169. continue
  170. }
  171. rq.queue.Add(key)
  172. }
  173. }
  174. // obj could be an *v1.ResourceQuota, or a DeletionFinalStateUnknown marker item.
  175. func (rq *ResourceQuotaController) enqueueResourceQuota(obj interface{}) {
  176. key, err := controller.KeyFunc(obj)
  177. if err != nil {
  178. klog.Errorf("Couldn't get key for object %+v: %v", obj, err)
  179. return
  180. }
  181. rq.queue.Add(key)
  182. }
  183. func (rq *ResourceQuotaController) addQuota(obj interface{}) {
  184. key, err := controller.KeyFunc(obj)
  185. if err != nil {
  186. klog.Errorf("Couldn't get key for object %+v: %v", obj, err)
  187. return
  188. }
  189. resourceQuota := obj.(*v1.ResourceQuota)
  190. // if we declared an intent that is not yet captured in status (prioritize it)
  191. if !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) {
  192. rq.missingUsageQueue.Add(key)
  193. return
  194. }
  195. // if we declared a constraint that has no usage (which this controller can calculate, prioritize it)
  196. for constraint := range resourceQuota.Status.Hard {
  197. if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
  198. matchedResources := []v1.ResourceName{v1.ResourceName(constraint)}
  199. for _, evaluator := range rq.registry.List() {
  200. if intersection := evaluator.MatchingResources(matchedResources); len(intersection) > 0 {
  201. rq.missingUsageQueue.Add(key)
  202. return
  203. }
  204. }
  205. }
  206. }
  207. // no special priority, go in normal recalc queue
  208. rq.queue.Add(key)
  209. }
  210. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  211. func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
  212. workFunc := func() bool {
  213. key, quit := queue.Get()
  214. if quit {
  215. return true
  216. }
  217. defer queue.Done(key)
  218. rq.workerLock.RLock()
  219. defer rq.workerLock.RUnlock()
  220. err := rq.syncHandler(key.(string))
  221. if err == nil {
  222. queue.Forget(key)
  223. return false
  224. }
  225. utilruntime.HandleError(err)
  226. queue.AddRateLimited(key)
  227. return false
  228. }
  229. return func() {
  230. for {
  231. if quit := workFunc(); quit {
  232. klog.Infof("resource quota controller worker shutting down")
  233. return
  234. }
  235. }
  236. }
  237. }
  238. // Run begins quota controller using the specified number of workers
  239. func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
  240. defer utilruntime.HandleCrash()
  241. defer rq.queue.ShutDown()
  242. klog.Infof("Starting resource quota controller")
  243. defer klog.Infof("Shutting down resource quota controller")
  244. if rq.quotaMonitor != nil {
  245. go rq.quotaMonitor.Run(stopCh)
  246. }
  247. if !cache.WaitForNamedCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
  248. return
  249. }
  250. // the workers that chug through the quota calculation backlog
  251. for i := 0; i < workers; i++ {
  252. go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
  253. go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
  254. }
  255. // the timer for how often we do a full recalculation across all quotas
  256. go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
  257. <-stopCh
  258. }
  259. // syncResourceQuotaFromKey syncs a quota key
  260. func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) {
  261. startTime := time.Now()
  262. defer func() {
  263. klog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Since(startTime))
  264. }()
  265. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  266. if err != nil {
  267. return err
  268. }
  269. quota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
  270. if errors.IsNotFound(err) {
  271. klog.Infof("Resource quota has been deleted %v", key)
  272. return nil
  273. }
  274. if err != nil {
  275. klog.Infof("Unable to retrieve resource quota %v from store: %v", key, err)
  276. return err
  277. }
  278. return rq.syncResourceQuota(quota)
  279. }
  280. // syncResourceQuota runs a complete sync of resource quota status across all known kinds
  281. func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQuota) (err error) {
  282. // quota is dirty if any part of spec hard limits differs from the status hard limits
  283. statusLimitsDirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
  284. // dirty tracks if the usage status differs from the previous sync,
  285. // if so, we send a new usage with latest status
  286. // if this is our first sync, it will be dirty by default, since we need track usage
  287. dirty := statusLimitsDirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil
  288. used := v1.ResourceList{}
  289. if resourceQuota.Status.Used != nil {
  290. used = quota.Add(v1.ResourceList{}, resourceQuota.Status.Used)
  291. }
  292. hardLimits := quota.Add(v1.ResourceList{}, resourceQuota.Spec.Hard)
  293. errors := []error{}
  294. newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry, resourceQuota.Spec.ScopeSelector)
  295. if err != nil {
  296. // if err is non-nil, remember it to return, but continue updating status with any resources in newUsage
  297. errors = append(errors, err)
  298. }
  299. for key, value := range newUsage {
  300. used[key] = value
  301. }
  302. // ensure set of used values match those that have hard constraints
  303. hardResources := quota.ResourceNames(hardLimits)
  304. used = quota.Mask(used, hardResources)
  305. // Create a usage object that is based on the quota resource version that will handle updates
  306. // by default, we preserve the past usage observation, and set hard to the current spec
  307. usage := resourceQuota.DeepCopy()
  308. usage.Status = v1.ResourceQuotaStatus{
  309. Hard: hardLimits,
  310. Used: used,
  311. }
  312. dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)
  313. // there was a change observed by this controller that requires we update quota
  314. if dirty {
  315. _, err = rq.rqClient.ResourceQuotas(usage.Namespace).UpdateStatus(context.TODO(), usage, metav1.UpdateOptions{})
  316. if err != nil {
  317. errors = append(errors, err)
  318. }
  319. }
  320. return utilerrors.NewAggregate(errors)
  321. }
  322. // replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
  323. func (rq *ResourceQuotaController) replenishQuota(groupResource schema.GroupResource, namespace string) {
  324. // check if the quota controller can evaluate this groupResource, if not, ignore it altogether...
  325. evaluator := rq.registry.Get(groupResource)
  326. if evaluator == nil {
  327. return
  328. }
  329. // check if this namespace even has a quota...
  330. resourceQuotas, err := rq.rqLister.ResourceQuotas(namespace).List(labels.Everything())
  331. if errors.IsNotFound(err) {
  332. utilruntime.HandleError(fmt.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod()))
  333. return
  334. }
  335. if err != nil {
  336. utilruntime.HandleError(fmt.Errorf("error checking to see if namespace %s has any ResourceQuota associated with it: %v", namespace, err))
  337. return
  338. }
  339. if len(resourceQuotas) == 0 {
  340. return
  341. }
  342. // only queue those quotas that are tracking a resource associated with this kind.
  343. for i := range resourceQuotas {
  344. resourceQuota := resourceQuotas[i]
  345. resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
  346. if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {
  347. // TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
  348. rq.enqueueResourceQuota(resourceQuota)
  349. }
  350. }
  351. }
  352. // Sync periodically resyncs the controller when new resources are observed from discovery.
  353. func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, period time.Duration, stopCh <-chan struct{}) {
  354. // Something has changed, so track the new state and perform a sync.
  355. oldResources := make(map[schema.GroupVersionResource]struct{})
  356. wait.Until(func() {
  357. // Get the current resource list from discovery.
  358. newResources, err := GetQuotableResources(discoveryFunc)
  359. if err != nil {
  360. utilruntime.HandleError(err)
  361. if discovery.IsGroupDiscoveryFailedError(err) && len(newResources) > 0 {
  362. // In partial discovery cases, don't remove any existing informers, just add new ones
  363. for k, v := range oldResources {
  364. newResources[k] = v
  365. }
  366. } else {
  367. // short circuit in non-discovery error cases or if discovery returned zero resources
  368. return
  369. }
  370. }
  371. // Decide whether discovery has reported a change.
  372. if reflect.DeepEqual(oldResources, newResources) {
  373. klog.V(4).Infof("no resource updates from discovery, skipping resource quota sync")
  374. return
  375. }
  376. // Ensure workers are paused to avoid processing events before informers
  377. // have resynced.
  378. rq.workerLock.Lock()
  379. defer rq.workerLock.Unlock()
  380. // Something has changed, so track the new state and perform a sync.
  381. if klog.V(2) {
  382. klog.Infof("syncing resource quota controller with updated resources from discovery: %s", printDiff(oldResources, newResources))
  383. }
  384. // Perform the monitor resync and wait for controllers to report cache sync.
  385. if err := rq.resyncMonitors(newResources); err != nil {
  386. utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
  387. return
  388. }
  389. // wait for caches to fill for a while (our sync period).
  390. // this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
  391. // informers keep attempting to sync in the background, so retrying doesn't interrupt them.
  392. // the call to resyncMonitors on the reattempt will no-op for resources that still exist.
  393. if rq.quotaMonitor != nil && !cache.WaitForNamedCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) {
  394. utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
  395. return
  396. }
  397. // success, remember newly synced resources
  398. oldResources = newResources
  399. klog.V(2).Infof("synced quota controller")
  400. }, period, stopCh)
  401. }
  402. // printDiff returns a human-readable summary of what resources were added and removed
  403. func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
  404. removed := sets.NewString()
  405. for oldResource := range oldResources {
  406. if _, ok := newResources[oldResource]; !ok {
  407. removed.Insert(fmt.Sprintf("%+v", oldResource))
  408. }
  409. }
  410. added := sets.NewString()
  411. for newResource := range newResources {
  412. if _, ok := oldResources[newResource]; !ok {
  413. added.Insert(fmt.Sprintf("%+v", newResource))
  414. }
  415. }
  416. return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
  417. }
  418. // waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
  419. func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
  420. stopChWithTimeout := make(chan struct{})
  421. go func() {
  422. defer close(stopChWithTimeout)
  423. select {
  424. case <-stopCh:
  425. case <-time.After(timeout):
  426. }
  427. }()
  428. return stopChWithTimeout
  429. }
  430. // resyncMonitors starts or stops quota monitors as needed to ensure that all
  431. // (and only) those resources present in the map are monitored.
  432. func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
  433. if rq.quotaMonitor == nil {
  434. return nil
  435. }
  436. if err := rq.quotaMonitor.SyncMonitors(resources); err != nil {
  437. return err
  438. }
  439. rq.quotaMonitor.StartMonitors()
  440. return nil
  441. }
  442. // GetQuotableResources returns all resources that the quota system should recognize.
  443. // It requires a resource supports the following verbs: 'create','list','delete'
  444. // This function may return both results and an error. If that happens, it means that the discovery calls were only
  445. // partially successful. A decision about whether to proceed or not is left to the caller.
  446. func GetQuotableResources(discoveryFunc NamespacedResourcesFunc) (map[schema.GroupVersionResource]struct{}, error) {
  447. possibleResources, discoveryErr := discoveryFunc()
  448. if discoveryErr != nil && len(possibleResources) == 0 {
  449. return nil, fmt.Errorf("failed to discover resources: %v", discoveryErr)
  450. }
  451. quotableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"create", "list", "watch", "delete"}}, possibleResources)
  452. quotableGroupVersionResources, err := discovery.GroupVersionResources(quotableResources)
  453. if err != nil {
  454. return nil, fmt.Errorf("Failed to parse resources: %v", err)
  455. }
  456. // return the original discovery error (if any) in addition to the list
  457. return quotableGroupVersionResources, discoveryErr
  458. }