resource_access.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package resourcequota
  14. import (
  15. "fmt"
  16. "time"
  17. "github.com/hashicorp/golang-lru"
  18. corev1 "k8s.io/api/core/v1"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/labels"
  21. "k8s.io/apiserver/pkg/storage/etcd"
  22. "k8s.io/client-go/kubernetes"
  23. corev1listers "k8s.io/client-go/listers/core/v1"
  24. )
  25. // QuotaAccessor abstracts the get/set logic from the rest of the Evaluator. This could be a test stub, a straight passthrough,
  26. // or most commonly a series of deconflicting caches.
  27. type QuotaAccessor interface {
  28. // UpdateQuotaStatus is called to persist final status. This method should write to persistent storage.
  29. // An error indicates that write didn't complete successfully.
  30. UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error
  31. // GetQuotas gets all possible quotas for a given namespace
  32. GetQuotas(namespace string) ([]corev1.ResourceQuota, error)
  33. }
  34. type quotaAccessor struct {
  35. client kubernetes.Interface
  36. // lister can list/get quota objects from a shared informer's cache
  37. lister corev1listers.ResourceQuotaLister
  38. // liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
  39. // This lets us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
  40. // We track the lookup result here so that for repeated requests, we don't look it up very often.
  41. liveLookupCache *lru.Cache
  42. liveTTL time.Duration
  43. // updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
  44. // back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
  45. // for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
  46. updatedQuotas *lru.Cache
  47. }
  48. // newQuotaAccessor creates an object that conforms to the QuotaAccessor interface to be used to retrieve quota objects.
  49. func newQuotaAccessor() (*quotaAccessor, error) {
  50. liveLookupCache, err := lru.New(100)
  51. if err != nil {
  52. return nil, err
  53. }
  54. updatedCache, err := lru.New(100)
  55. if err != nil {
  56. return nil, err
  57. }
  58. // client and lister will be set when SetInternalKubeClientSet and SetInternalKubeInformerFactory are invoked
  59. return &quotaAccessor{
  60. liveLookupCache: liveLookupCache,
  61. liveTTL: time.Duration(30 * time.Second),
  62. updatedQuotas: updatedCache,
  63. }, nil
  64. }
  65. func (e *quotaAccessor) UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error {
  66. updatedQuota, err := e.client.CoreV1().ResourceQuotas(newQuota.Namespace).UpdateStatus(newQuota)
  67. if err != nil {
  68. return err
  69. }
  70. key := newQuota.Namespace + "/" + newQuota.Name
  71. e.updatedQuotas.Add(key, updatedQuota)
  72. return nil
  73. }
  74. var etcdVersioner = etcd.APIObjectVersioner{}
  75. // checkCache compares the passed quota against the value in the look-aside cache and returns the newer
  76. // if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
  77. // being monotonically increasing integers
  78. func (e *quotaAccessor) checkCache(quota *corev1.ResourceQuota) *corev1.ResourceQuota {
  79. key := quota.Namespace + "/" + quota.Name
  80. uncastCachedQuota, ok := e.updatedQuotas.Get(key)
  81. if !ok {
  82. return quota
  83. }
  84. cachedQuota := uncastCachedQuota.(*corev1.ResourceQuota)
  85. if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
  86. e.updatedQuotas.Remove(key)
  87. return quota
  88. }
  89. return cachedQuota
  90. }
  91. func (e *quotaAccessor) GetQuotas(namespace string) ([]corev1.ResourceQuota, error) {
  92. // determine if there are any quotas in this namespace
  93. // if there are no quotas, we don't need to do anything
  94. items, err := e.lister.ResourceQuotas(namespace).List(labels.Everything())
  95. if err != nil {
  96. return nil, fmt.Errorf("error resolving quota: %v", err)
  97. }
  98. // if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it.
  99. if len(items) == 0 {
  100. lruItemObj, ok := e.liveLookupCache.Get(namespace)
  101. if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
  102. // TODO: If there are multiple operations at the same time and cache has just expired,
  103. // this may cause multiple List operations being issued at the same time.
  104. // If there is already in-flight List() for a given namespace, we should wait until
  105. // it is finished and cache is updated instead of doing the same, also to avoid
  106. // throttling - see #22422 for details.
  107. liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(metav1.ListOptions{})
  108. if err != nil {
  109. return nil, err
  110. }
  111. newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)}
  112. for i := range liveList.Items {
  113. newEntry.items = append(newEntry.items, &liveList.Items[i])
  114. }
  115. e.liveLookupCache.Add(namespace, newEntry)
  116. lruItemObj = newEntry
  117. }
  118. lruEntry := lruItemObj.(liveLookupEntry)
  119. for i := range lruEntry.items {
  120. items = append(items, lruEntry.items[i])
  121. }
  122. }
  123. resourceQuotas := []corev1.ResourceQuota{}
  124. for i := range items {
  125. quota := items[i]
  126. quota = e.checkCache(quota)
  127. // always make a copy. We're going to muck around with this and we should never mutate the originals
  128. resourceQuotas = append(resourceQuotas, *quota)
  129. }
  130. return resourceQuotas, nil
  131. }