resource_access.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package resourcequota
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. "github.com/hashicorp/golang-lru"
  19. corev1 "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/labels"
  22. "k8s.io/apiserver/pkg/storage/etcd3"
  23. "k8s.io/client-go/kubernetes"
  24. corev1listers "k8s.io/client-go/listers/core/v1"
  25. )
  26. // QuotaAccessor abstracts the get/set logic from the rest of the Evaluator. This could be a test stub, a straight passthrough,
  27. // or most commonly a series of deconflicting caches.
  28. type QuotaAccessor interface {
  29. // UpdateQuotaStatus is called to persist final status. This method should write to persistent storage.
  30. // An error indicates that write didn't complete successfully.
  31. UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error
  32. // GetQuotas gets all possible quotas for a given namespace
  33. GetQuotas(namespace string) ([]corev1.ResourceQuota, error)
  34. }
  35. type quotaAccessor struct {
  36. client kubernetes.Interface
  37. // lister can list/get quota objects from a shared informer's cache
  38. lister corev1listers.ResourceQuotaLister
  39. // liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
  40. // This lets us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
  41. // We track the lookup result here so that for repeated requests, we don't look it up very often.
  42. liveLookupCache *lru.Cache
  43. liveTTL time.Duration
  44. // updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
  45. // back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
  46. // for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
  47. updatedQuotas *lru.Cache
  48. }
  49. // newQuotaAccessor creates an object that conforms to the QuotaAccessor interface to be used to retrieve quota objects.
  50. func newQuotaAccessor() (*quotaAccessor, error) {
  51. liveLookupCache, err := lru.New(100)
  52. if err != nil {
  53. return nil, err
  54. }
  55. updatedCache, err := lru.New(100)
  56. if err != nil {
  57. return nil, err
  58. }
  59. // client and lister will be set when SetInternalKubeClientSet and SetInternalKubeInformerFactory are invoked
  60. return &quotaAccessor{
  61. liveLookupCache: liveLookupCache,
  62. liveTTL: time.Duration(30 * time.Second),
  63. updatedQuotas: updatedCache,
  64. }, nil
  65. }
  66. func (e *quotaAccessor) UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error {
  67. updatedQuota, err := e.client.CoreV1().ResourceQuotas(newQuota.Namespace).UpdateStatus(context.TODO(), newQuota, metav1.UpdateOptions{})
  68. if err != nil {
  69. return err
  70. }
  71. key := newQuota.Namespace + "/" + newQuota.Name
  72. e.updatedQuotas.Add(key, updatedQuota)
  73. return nil
  74. }
  75. var etcdVersioner = etcd3.APIObjectVersioner{}
  76. // checkCache compares the passed quota against the value in the look-aside cache and returns the newer
  77. // if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
  78. // being monotonically increasing integers
  79. func (e *quotaAccessor) checkCache(quota *corev1.ResourceQuota) *corev1.ResourceQuota {
  80. key := quota.Namespace + "/" + quota.Name
  81. uncastCachedQuota, ok := e.updatedQuotas.Get(key)
  82. if !ok {
  83. return quota
  84. }
  85. cachedQuota := uncastCachedQuota.(*corev1.ResourceQuota)
  86. if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
  87. e.updatedQuotas.Remove(key)
  88. return quota
  89. }
  90. return cachedQuota
  91. }
  92. func (e *quotaAccessor) GetQuotas(namespace string) ([]corev1.ResourceQuota, error) {
  93. // determine if there are any quotas in this namespace
  94. // if there are no quotas, we don't need to do anything
  95. items, err := e.lister.ResourceQuotas(namespace).List(labels.Everything())
  96. if err != nil {
  97. return nil, fmt.Errorf("error resolving quota: %v", err)
  98. }
  99. // if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it.
  100. if len(items) == 0 {
  101. lruItemObj, ok := e.liveLookupCache.Get(namespace)
  102. if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
  103. // TODO: If there are multiple operations at the same time and cache has just expired,
  104. // this may cause multiple List operations being issued at the same time.
  105. // If there is already in-flight List() for a given namespace, we should wait until
  106. // it is finished and cache is updated instead of doing the same, also to avoid
  107. // throttling - see #22422 for details.
  108. liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(context.TODO(), metav1.ListOptions{})
  109. if err != nil {
  110. return nil, err
  111. }
  112. newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)}
  113. for i := range liveList.Items {
  114. newEntry.items = append(newEntry.items, &liveList.Items[i])
  115. }
  116. e.liveLookupCache.Add(namespace, newEntry)
  117. lruItemObj = newEntry
  118. }
  119. lruEntry := lruItemObj.(liveLookupEntry)
  120. for i := range lruEntry.items {
  121. items = append(items, lruEntry.items[i])
  122. }
  123. }
  124. resourceQuotas := []corev1.ResourceQuota{}
  125. for i := range items {
  126. quota := items[i]
  127. quota = e.checkCache(quota)
  128. // always make a copy. We're going to muck around with this and we should never mutate the originals
  129. resourceQuotas = append(resourceQuotas, *quota)
  130. }
  131. return resourceQuotas, nil
  132. }