limitenforcer.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package eventratelimit
  14. import (
  15. "fmt"
  16. "strings"
  17. "github.com/hashicorp/golang-lru"
  18. "k8s.io/apiserver/pkg/admission"
  19. "k8s.io/client-go/util/flowcontrol"
  20. api "k8s.io/kubernetes/pkg/apis/core"
  21. eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit"
  22. )
  23. const (
  24. // cache size to use if the user did not specify a cache size
  25. defaultCacheSize = 4096
  26. )
  27. // limitEnforcer enforces a single type of event rate limit, such as server, namespace, or source+object
  28. type limitEnforcer struct {
  29. // type of this limit
  30. limitType eventratelimitapi.LimitType
  31. // cache for holding the rate limiters
  32. cache cache
  33. // a keyFunc which is responsible for computing a single key based on input
  34. keyFunc func(admission.Attributes) string
  35. }
  36. func newLimitEnforcer(config eventratelimitapi.Limit, clock flowcontrol.Clock) (*limitEnforcer, error) {
  37. rateLimiterFactory := func() flowcontrol.RateLimiter {
  38. return flowcontrol.NewTokenBucketRateLimiterWithClock(float32(config.QPS), int(config.Burst), clock)
  39. }
  40. if config.Type == eventratelimitapi.ServerLimitType {
  41. return &limitEnforcer{
  42. limitType: config.Type,
  43. cache: &singleCache{
  44. rateLimiter: rateLimiterFactory(),
  45. },
  46. keyFunc: getServerKey,
  47. }, nil
  48. }
  49. cacheSize := int(config.CacheSize)
  50. if cacheSize == 0 {
  51. cacheSize = defaultCacheSize
  52. }
  53. underlyingCache, err := lru.New(cacheSize)
  54. if err != nil {
  55. return nil, fmt.Errorf("could not create lru cache: %v", err)
  56. }
  57. cache := &lruCache{
  58. rateLimiterFactory: rateLimiterFactory,
  59. cache: underlyingCache,
  60. }
  61. var keyFunc func(admission.Attributes) string
  62. switch t := config.Type; t {
  63. case eventratelimitapi.NamespaceLimitType:
  64. keyFunc = getNamespaceKey
  65. case eventratelimitapi.UserLimitType:
  66. keyFunc = getUserKey
  67. case eventratelimitapi.SourceAndObjectLimitType:
  68. keyFunc = getSourceAndObjectKey
  69. default:
  70. return nil, fmt.Errorf("unknown event rate limit type: %v", t)
  71. }
  72. return &limitEnforcer{
  73. limitType: config.Type,
  74. cache: cache,
  75. keyFunc: keyFunc,
  76. }, nil
  77. }
  78. func (enforcer *limitEnforcer) accept(attr admission.Attributes) error {
  79. key := enforcer.keyFunc(attr)
  80. rateLimiter := enforcer.cache.get(key)
  81. // ensure we have available rate
  82. allow := rateLimiter.TryAccept()
  83. if !allow {
  84. return fmt.Errorf("limit reached on type %v for key %v", enforcer.limitType, key)
  85. }
  86. return nil
  87. }
  88. func getServerKey(attr admission.Attributes) string {
  89. return ""
  90. }
  91. // getNamespaceKey returns a cache key that is based on the namespace of the event request
  92. func getNamespaceKey(attr admission.Attributes) string {
  93. return attr.GetNamespace()
  94. }
  95. // getUserKey returns a cache key that is based on the user of the event request
  96. func getUserKey(attr admission.Attributes) string {
  97. userInfo := attr.GetUserInfo()
  98. if userInfo == nil {
  99. return ""
  100. }
  101. return userInfo.GetName()
  102. }
  103. // getSourceAndObjectKey returns a cache key that is based on the source+object of the event
  104. func getSourceAndObjectKey(attr admission.Attributes) string {
  105. object := attr.GetObject()
  106. if object == nil {
  107. return ""
  108. }
  109. event, ok := object.(*api.Event)
  110. if !ok {
  111. return ""
  112. }
  113. return strings.Join([]string{
  114. event.Source.Component,
  115. event.Source.Host,
  116. event.InvolvedObject.Kind,
  117. event.InvolvedObject.Namespace,
  118. event.InvolvedObject.Name,
  119. string(event.InvolvedObject.UID),
  120. event.InvolvedObject.APIVersion,
  121. }, "")
  122. }