resource_quota_controller.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package resourcequota
  14. import (
  15. "fmt"
  16. "reflect"
  17. "sync"
  18. "time"
  19. "k8s.io/klog"
  20. "k8s.io/api/core/v1"
  21. apiequality "k8s.io/apimachinery/pkg/api/equality"
  22. "k8s.io/apimachinery/pkg/api/errors"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/labels"
  25. "k8s.io/apimachinery/pkg/runtime/schema"
  26. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  27. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  28. "k8s.io/apimachinery/pkg/util/sets"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. "k8s.io/client-go/discovery"
  31. coreinformers "k8s.io/client-go/informers/core/v1"
  32. corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
  33. corelisters "k8s.io/client-go/listers/core/v1"
  34. "k8s.io/client-go/tools/cache"
  35. "k8s.io/client-go/util/workqueue"
  36. "k8s.io/kubernetes/pkg/controller"
  37. quota "k8s.io/kubernetes/pkg/quota/v1"
  38. )
  39. // NamespacedResourcesFunc knows how to discover namespaced resources.
  40. type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)
  41. // ReplenishmentFunc is a signal that a resource changed in specified namespace
  42. // that may require quota to be recalculated.
  43. type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string)
  44. // ResourceQuotaControllerOptions holds options for creating a quota controller
  45. type ResourceQuotaControllerOptions struct {
  46. // Must have authority to list all quotas, and update quota status
  47. QuotaClient corev1client.ResourceQuotasGetter
  48. // Shared informer for resource quotas
  49. ResourceQuotaInformer coreinformers.ResourceQuotaInformer
  50. // Controls full recalculation of quota usage
  51. ResyncPeriod controller.ResyncPeriodFunc
  52. // Maintains evaluators that know how to calculate usage for group resource
  53. Registry quota.Registry
  54. // Discover list of supported resources on the server.
  55. DiscoveryFunc NamespacedResourcesFunc
  56. // A function that returns the list of resources to ignore
  57. IgnoredResourcesFunc func() map[schema.GroupResource]struct{}
  58. // InformersStarted knows if informers were started.
  59. InformersStarted <-chan struct{}
  60. // InformerFactory interfaces with informers.
  61. InformerFactory controller.InformerFactory
  62. // Controls full resync of objects monitored for replenishment.
  63. ReplenishmentResyncPeriod controller.ResyncPeriodFunc
  64. }
  65. // ResourceQuotaController is responsible for tracking quota usage status in the system
  66. type ResourceQuotaController struct {
  67. // Must have authority to list all resources in the system, and update quota status
  68. rqClient corev1client.ResourceQuotasGetter
  69. // A lister/getter of resource quota objects
  70. rqLister corelisters.ResourceQuotaLister
  71. // A list of functions that return true when their caches have synced
  72. informerSyncedFuncs []cache.InformerSynced
  73. // ResourceQuota objects that need to be synchronized
  74. queue workqueue.RateLimitingInterface
  75. // missingUsageQueue holds objects that are missing the initial usage information
  76. missingUsageQueue workqueue.RateLimitingInterface
  77. // To allow injection of syncUsage for testing.
  78. syncHandler func(key string) error
  79. // function that controls full recalculation of quota usage
  80. resyncPeriod controller.ResyncPeriodFunc
  81. // knows how to calculate usage
  82. registry quota.Registry
  83. // knows how to monitor all the resources tracked by quota and trigger replenishment
  84. quotaMonitor *QuotaMonitor
  85. // controls the workers that process quotas
  86. // this lock is acquired to control write access to the monitors and ensures that all
  87. // monitors are synced before the controller can process quotas.
  88. workerLock sync.RWMutex
  89. }
  90. // NewResourceQuotaController creates a quota controller with specified options
  91. func NewResourceQuotaController(options *ResourceQuotaControllerOptions) (*ResourceQuotaController, error) {
  92. // build the resource quota controller
  93. rq := &ResourceQuotaController{
  94. rqClient: options.QuotaClient,
  95. rqLister: options.ResourceQuotaInformer.Lister(),
  96. informerSyncedFuncs: []cache.InformerSynced{options.ResourceQuotaInformer.Informer().HasSynced},
  97. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
  98. missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
  99. resyncPeriod: options.ResyncPeriod,
  100. registry: options.Registry,
  101. }
  102. // set the synchronization handler
  103. rq.syncHandler = rq.syncResourceQuotaFromKey
  104. options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
  105. cache.ResourceEventHandlerFuncs{
  106. AddFunc: rq.addQuota,
  107. UpdateFunc: func(old, cur interface{}) {
  108. // We are only interested in observing updates to quota.spec to drive updates to quota.status.
  109. // We ignore all updates to quota.Status because they are all driven by this controller.
  110. // IMPORTANT:
  111. // We do not use this function to queue up a full quota recalculation. To do so, would require
  112. // us to enqueue all quota.Status updates, and since quota.Status updates involve additional queries
  113. // that cannot be backed by a cache and result in a full query of a namespace's content, we do not
  114. // want to pay the price on spurious status updates. As a result, we have a separate routine that is
  115. // responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
  116. oldResourceQuota := old.(*v1.ResourceQuota)
  117. curResourceQuota := cur.(*v1.ResourceQuota)
  118. if quota.V1Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
  119. return
  120. }
  121. rq.addQuota(curResourceQuota)
  122. },
  123. // This will enter the sync loop and no-op, because the controller has been deleted from the store.
  124. // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
  125. // way of achieving this is by performing a `stop` operation on the controller.
  126. DeleteFunc: rq.enqueueResourceQuota,
  127. },
  128. rq.resyncPeriod(),
  129. )
  130. if options.DiscoveryFunc != nil {
  131. qm := &QuotaMonitor{
  132. informersStarted: options.InformersStarted,
  133. informerFactory: options.InformerFactory,
  134. ignoredResources: options.IgnoredResourcesFunc(),
  135. resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
  136. resyncPeriod: options.ReplenishmentResyncPeriod,
  137. replenishmentFunc: rq.replenishQuota,
  138. registry: rq.registry,
  139. }
  140. rq.quotaMonitor = qm
  141. // do initial quota monitor setup. If we have a discovery failure here, it's ok. We'll discover more resources when a later sync happens.
  142. resources, err := GetQuotableResources(options.DiscoveryFunc)
  143. if discovery.IsGroupDiscoveryFailedError(err) {
  144. utilruntime.HandleError(fmt.Errorf("initial discovery check failure, continuing and counting on future sync update: %v", err))
  145. } else if err != nil {
  146. return nil, err
  147. }
  148. if err = qm.SyncMonitors(resources); err != nil {
  149. utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
  150. }
  151. // only start quota once all informers synced
  152. rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
  153. }
  154. return rq, nil
  155. }
  156. // enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
  157. func (rq *ResourceQuotaController) enqueueAll() {
  158. defer klog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
  159. rqs, err := rq.rqLister.List(labels.Everything())
  160. if err != nil {
  161. utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err))
  162. return
  163. }
  164. for i := range rqs {
  165. key, err := controller.KeyFunc(rqs[i])
  166. if err != nil {
  167. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", rqs[i], err))
  168. continue
  169. }
  170. rq.queue.Add(key)
  171. }
  172. }
  173. // obj could be an *v1.ResourceQuota, or a DeletionFinalStateUnknown marker item.
  174. func (rq *ResourceQuotaController) enqueueResourceQuota(obj interface{}) {
  175. key, err := controller.KeyFunc(obj)
  176. if err != nil {
  177. klog.Errorf("Couldn't get key for object %+v: %v", obj, err)
  178. return
  179. }
  180. rq.queue.Add(key)
  181. }
  182. func (rq *ResourceQuotaController) addQuota(obj interface{}) {
  183. key, err := controller.KeyFunc(obj)
  184. if err != nil {
  185. klog.Errorf("Couldn't get key for object %+v: %v", obj, err)
  186. return
  187. }
  188. resourceQuota := obj.(*v1.ResourceQuota)
  189. // if we declared an intent that is not yet captured in status (prioritize it)
  190. if !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) {
  191. rq.missingUsageQueue.Add(key)
  192. return
  193. }
  194. // if we declared a constraint that has no usage (which this controller can calculate, prioritize it)
  195. for constraint := range resourceQuota.Status.Hard {
  196. if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
  197. matchedResources := []v1.ResourceName{v1.ResourceName(constraint)}
  198. for _, evaluator := range rq.registry.List() {
  199. if intersection := evaluator.MatchingResources(matchedResources); len(intersection) > 0 {
  200. rq.missingUsageQueue.Add(key)
  201. return
  202. }
  203. }
  204. }
  205. }
  206. // no special priority, go in normal recalc queue
  207. rq.queue.Add(key)
  208. }
  209. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  210. func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
  211. workFunc := func() bool {
  212. key, quit := queue.Get()
  213. if quit {
  214. return true
  215. }
  216. defer queue.Done(key)
  217. rq.workerLock.RLock()
  218. defer rq.workerLock.RUnlock()
  219. err := rq.syncHandler(key.(string))
  220. if err == nil {
  221. queue.Forget(key)
  222. return false
  223. }
  224. utilruntime.HandleError(err)
  225. queue.AddRateLimited(key)
  226. return false
  227. }
  228. return func() {
  229. for {
  230. if quit := workFunc(); quit {
  231. klog.Infof("resource quota controller worker shutting down")
  232. return
  233. }
  234. }
  235. }
  236. }
  237. // Run begins quota controller using the specified number of workers
  238. func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
  239. defer utilruntime.HandleCrash()
  240. defer rq.queue.ShutDown()
  241. klog.Infof("Starting resource quota controller")
  242. defer klog.Infof("Shutting down resource quota controller")
  243. if rq.quotaMonitor != nil {
  244. go rq.quotaMonitor.Run(stopCh)
  245. }
  246. if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
  247. return
  248. }
  249. // the workers that chug through the quota calculation backlog
  250. for i := 0; i < workers; i++ {
  251. go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
  252. go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
  253. }
  254. // the timer for how often we do a full recalculation across all quotas
  255. go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
  256. <-stopCh
  257. }
  258. // syncResourceQuotaFromKey syncs a quota key
  259. func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) {
  260. startTime := time.Now()
  261. defer func() {
  262. klog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Since(startTime))
  263. }()
  264. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  265. if err != nil {
  266. return err
  267. }
  268. quota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
  269. if errors.IsNotFound(err) {
  270. klog.Infof("Resource quota has been deleted %v", key)
  271. return nil
  272. }
  273. if err != nil {
  274. klog.Infof("Unable to retrieve resource quota %v from store: %v", key, err)
  275. return err
  276. }
  277. return rq.syncResourceQuota(quota)
  278. }
  279. // syncResourceQuota runs a complete sync of resource quota status across all known kinds
  280. func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQuota) (err error) {
  281. // quota is dirty if any part of spec hard limits differs from the status hard limits
  282. statusLimitsDirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
  283. // dirty tracks if the usage status differs from the previous sync,
  284. // if so, we send a new usage with latest status
  285. // if this is our first sync, it will be dirty by default, since we need track usage
  286. dirty := statusLimitsDirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil
  287. used := v1.ResourceList{}
  288. if resourceQuota.Status.Used != nil {
  289. used = quota.Add(v1.ResourceList{}, resourceQuota.Status.Used)
  290. }
  291. hardLimits := quota.Add(v1.ResourceList{}, resourceQuota.Spec.Hard)
  292. errors := []error{}
  293. newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry, resourceQuota.Spec.ScopeSelector)
  294. if err != nil {
  295. // if err is non-nil, remember it to return, but continue updating status with any resources in newUsage
  296. errors = append(errors, err)
  297. }
  298. for key, value := range newUsage {
  299. used[key] = value
  300. }
  301. // ensure set of used values match those that have hard constraints
  302. hardResources := quota.ResourceNames(hardLimits)
  303. used = quota.Mask(used, hardResources)
  304. // Create a usage object that is based on the quota resource version that will handle updates
  305. // by default, we preserve the past usage observation, and set hard to the current spec
  306. usage := resourceQuota.DeepCopy()
  307. usage.Status = v1.ResourceQuotaStatus{
  308. Hard: hardLimits,
  309. Used: used,
  310. }
  311. dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)
  312. // there was a change observed by this controller that requires we update quota
  313. if dirty {
  314. _, err = rq.rqClient.ResourceQuotas(usage.Namespace).UpdateStatus(usage)
  315. if err != nil {
  316. errors = append(errors, err)
  317. }
  318. }
  319. return utilerrors.NewAggregate(errors)
  320. }
  321. // replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
  322. func (rq *ResourceQuotaController) replenishQuota(groupResource schema.GroupResource, namespace string) {
  323. // check if the quota controller can evaluate this groupResource, if not, ignore it altogether...
  324. evaluator := rq.registry.Get(groupResource)
  325. if evaluator == nil {
  326. return
  327. }
  328. // check if this namespace even has a quota...
  329. resourceQuotas, err := rq.rqLister.ResourceQuotas(namespace).List(labels.Everything())
  330. if errors.IsNotFound(err) {
  331. utilruntime.HandleError(fmt.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod()))
  332. return
  333. }
  334. if err != nil {
  335. utilruntime.HandleError(fmt.Errorf("error checking to see if namespace %s has any ResourceQuota associated with it: %v", namespace, err))
  336. return
  337. }
  338. if len(resourceQuotas) == 0 {
  339. return
  340. }
  341. // only queue those quotas that are tracking a resource associated with this kind.
  342. for i := range resourceQuotas {
  343. resourceQuota := resourceQuotas[i]
  344. resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
  345. if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {
  346. // TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
  347. rq.enqueueResourceQuota(resourceQuota)
  348. }
  349. }
  350. }
  351. // Sync periodically resyncs the controller when new resources are observed from discovery.
  352. func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, period time.Duration, stopCh <-chan struct{}) {
  353. // Something has changed, so track the new state and perform a sync.
  354. oldResources := make(map[schema.GroupVersionResource]struct{})
  355. wait.Until(func() {
  356. // Get the current resource list from discovery.
  357. newResources, err := GetQuotableResources(discoveryFunc)
  358. if err != nil {
  359. utilruntime.HandleError(err)
  360. if discovery.IsGroupDiscoveryFailedError(err) && len(newResources) > 0 {
  361. // In partial discovery cases, don't remove any existing informers, just add new ones
  362. for k, v := range oldResources {
  363. newResources[k] = v
  364. }
  365. } else {
  366. // short circuit in non-discovery error cases or if discovery returned zero resources
  367. return
  368. }
  369. }
  370. // Decide whether discovery has reported a change.
  371. if reflect.DeepEqual(oldResources, newResources) {
  372. klog.V(4).Infof("no resource updates from discovery, skipping resource quota sync")
  373. return
  374. }
  375. // Ensure workers are paused to avoid processing events before informers
  376. // have resynced.
  377. rq.workerLock.Lock()
  378. defer rq.workerLock.Unlock()
  379. // Something has changed, so track the new state and perform a sync.
  380. if klog.V(2) {
  381. klog.Infof("syncing resource quota controller with updated resources from discovery: %s", printDiff(oldResources, newResources))
  382. }
  383. // Perform the monitor resync and wait for controllers to report cache sync.
  384. if err := rq.resyncMonitors(newResources); err != nil {
  385. utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
  386. return
  387. }
  388. // wait for caches to fill for a while (our sync period).
  389. // this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
  390. // informers keep attempting to sync in the background, so retrying doesn't interrupt them.
  391. // the call to resyncMonitors on the reattempt will no-op for resources that still exist.
  392. if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) {
  393. utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
  394. return
  395. }
  396. // success, remember newly synced resources
  397. oldResources = newResources
  398. klog.V(2).Infof("synced quota controller")
  399. }, period, stopCh)
  400. }
  401. // printDiff returns a human-readable summary of what resources were added and removed
  402. func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
  403. removed := sets.NewString()
  404. for oldResource := range oldResources {
  405. if _, ok := newResources[oldResource]; !ok {
  406. removed.Insert(fmt.Sprintf("%+v", oldResource))
  407. }
  408. }
  409. added := sets.NewString()
  410. for newResource := range newResources {
  411. if _, ok := oldResources[newResource]; !ok {
  412. added.Insert(fmt.Sprintf("%+v", newResource))
  413. }
  414. }
  415. return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
  416. }
  417. // waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
  418. func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
  419. stopChWithTimeout := make(chan struct{})
  420. go func() {
  421. defer close(stopChWithTimeout)
  422. select {
  423. case <-stopCh:
  424. case <-time.After(timeout):
  425. }
  426. }()
  427. return stopChWithTimeout
  428. }
  429. // resyncMonitors starts or stops quota monitors as needed to ensure that all
  430. // (and only) those resources present in the map are monitored.
  431. func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
  432. if rq.quotaMonitor == nil {
  433. return nil
  434. }
  435. if err := rq.quotaMonitor.SyncMonitors(resources); err != nil {
  436. return err
  437. }
  438. rq.quotaMonitor.StartMonitors()
  439. return nil
  440. }
  441. // GetQuotableResources returns all resources that the quota system should recognize.
  442. // It requires a resource supports the following verbs: 'create','list','delete'
  443. // This function may return both results and an error. If that happens, it means that the discovery calls were only
  444. // partially successful. A decision about whether to proceed or not is left to the caller.
  445. func GetQuotableResources(discoveryFunc NamespacedResourcesFunc) (map[schema.GroupVersionResource]struct{}, error) {
  446. possibleResources, discoveryErr := discoveryFunc()
  447. if discoveryErr != nil && len(possibleResources) == 0 {
  448. return nil, fmt.Errorf("failed to discover resources: %v", discoveryErr)
  449. }
  450. quotableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"create", "list", "watch", "delete"}}, possibleResources)
  451. quotableGroupVersionResources, err := discovery.GroupVersionResources(quotableResources)
  452. if err != nil {
  453. return nil, fmt.Errorf("Failed to parse resources: %v", err)
  454. }
  455. // return the original discovery error (if any) in addition to the list
  456. return quotableGroupVersionResources, discoveryErr
  457. }