cache.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package metrics
  14. import (
  15. "math"
  16. "sync"
  17. "k8s.io/apimachinery/pkg/types"
  18. endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
  19. )
  20. // NewCache returns a new Cache with the specified endpointsPerSlice.
  21. func NewCache(endpointsPerSlice int32) *Cache {
  22. return &Cache{
  23. maxEndpointsPerSlice: endpointsPerSlice,
  24. cache: map[types.NamespacedName]*ServicePortCache{},
  25. }
  26. }
  27. // Cache tracks values for total numbers of desired endpoints as well as the
  28. // efficiency of EndpointSlice endpoints distribution.
  29. type Cache struct {
  30. // maxEndpointsPerSlice references the maximum number of endpoints that
  31. // should be added to an EndpointSlice.
  32. maxEndpointsPerSlice int32
  33. // lock protects changes to numEndpoints and cache.
  34. lock sync.Mutex
  35. // numEndpoints represents the total number of endpoints stored in
  36. // EndpointSlices.
  37. numEndpoints int
  38. // cache stores a ServicePortCache grouped by NamespacedNames representing
  39. // Services.
  40. cache map[types.NamespacedName]*ServicePortCache
  41. }
  42. // ServicePortCache tracks values for total numbers of desired endpoints as well
  43. // as the efficiency of EndpointSlice endpoints distribution for each unique
  44. // Service Port combination.
  45. type ServicePortCache struct {
  46. items map[endpointutil.PortMapKey]EfficiencyInfo
  47. }
  48. // EfficiencyInfo stores the number of Endpoints and Slices for calculating
  49. // total numbers of desired endpoints and the efficiency of EndpointSlice
  50. // endpoints distribution.
  51. type EfficiencyInfo struct {
  52. Endpoints int
  53. Slices int
  54. }
  55. // NewServicePortCache initializes and returns a new ServicePortCache.
  56. func NewServicePortCache() *ServicePortCache {
  57. return &ServicePortCache{
  58. items: map[endpointutil.PortMapKey]EfficiencyInfo{},
  59. }
  60. }
  61. // Set updates the ServicePortCache to contain the provided EfficiencyInfo
  62. // for the provided PortMapKey.
  63. func (spc *ServicePortCache) Set(pmKey endpointutil.PortMapKey, eInfo EfficiencyInfo) {
  64. spc.items[pmKey] = eInfo
  65. }
  66. // numEndpoints returns the total number of endpoints represented by a
  67. // ServicePortCache.
  68. func (spc *ServicePortCache) numEndpoints() int {
  69. num := 0
  70. for _, eInfo := range spc.items {
  71. num += eInfo.Endpoints
  72. }
  73. return num
  74. }
  75. // UpdateServicePortCache updates a ServicePortCache in the global cache for a
  76. // given Service and updates the corresponding metrics.
  77. // Parameters:
  78. // * serviceNN refers to a NamespacedName representing the Service.
  79. // * spCache refers to a ServicePortCache for the specified Service.
  80. func (c *Cache) UpdateServicePortCache(serviceNN types.NamespacedName, spCache *ServicePortCache) {
  81. c.lock.Lock()
  82. defer c.lock.Unlock()
  83. prevNumEndpoints := 0
  84. if existingSPCache, ok := c.cache[serviceNN]; ok {
  85. prevNumEndpoints = existingSPCache.numEndpoints()
  86. }
  87. currNumEndpoints := spCache.numEndpoints()
  88. // To keep numEndpoints up to date, add the difference between the number of
  89. // endpoints in the provided spCache and any spCache it might be replacing.
  90. c.numEndpoints = c.numEndpoints + currNumEndpoints - prevNumEndpoints
  91. c.cache[serviceNN] = spCache
  92. c.updateMetrics()
  93. }
  94. // DeleteService removes references of a Service from the global cache and
  95. // updates the corresponding metrics.
  96. func (c *Cache) DeleteService(serviceNN types.NamespacedName) {
  97. c.lock.Lock()
  98. defer c.lock.Unlock()
  99. if spCache, ok := c.cache[serviceNN]; ok {
  100. c.numEndpoints = c.numEndpoints - spCache.numEndpoints()
  101. delete(c.cache, serviceNN)
  102. c.updateMetrics()
  103. }
  104. }
  105. // metricsUpdate stores a desired and actual number of EndpointSlices.
  106. type metricsUpdate struct {
  107. desired, actual int
  108. }
  109. // desiredAndActualSlices returns a metricsUpdate with the desired and actual
  110. // number of EndpointSlices given the current values in the cache.
  111. // Must be called holding lock.
  112. func (c *Cache) desiredAndActualSlices() metricsUpdate {
  113. mUpdate := metricsUpdate{}
  114. for _, spCache := range c.cache {
  115. for _, eInfo := range spCache.items {
  116. mUpdate.actual += eInfo.Slices
  117. mUpdate.desired += numDesiredSlices(eInfo.Endpoints, int(c.maxEndpointsPerSlice))
  118. }
  119. }
  120. return mUpdate
  121. }
  122. // updateMetrics updates metrics with the values from this Cache.
  123. // Must be called holding lock.
  124. func (c *Cache) updateMetrics() {
  125. mUpdate := c.desiredAndActualSlices()
  126. NumEndpointSlices.WithLabelValues().Set(float64(mUpdate.actual))
  127. DesiredEndpointSlices.WithLabelValues().Set(float64(mUpdate.desired))
  128. EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints))
  129. }
  130. // numDesiredSlices calculates the number of EndpointSlices that would exist
  131. // with ideal endpoint distribution.
  132. func numDesiredSlices(numEndpoints, maxPerSlice int) int {
  133. if numEndpoints <= maxPerSlice {
  134. return 1
  135. }
  136. return int(math.Ceil(float64(numEndpoints) / float64(maxPerSlice)))
  137. }