trigger_time_tracker.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpoint
  14. import (
  15. "sync"
  16. "time"
  17. "k8s.io/api/core/v1"
  18. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  19. )
  20. // TriggerTimeTracker is a util used to compute the EndpointsLastChangeTriggerTime annotation which
  21. // is exported in the endpoints controller's sync function.
  22. // See the documentation of the EndpointsLastChangeTriggerTime annotation for more details.
  23. //
  24. // Please note that this util may compute a wrong EndpointsLastChangeTriggerTime if a same object
  25. // changes multiple times between two consecutive syncs. We're aware of this limitation but we
  26. // decided to accept it, as fixing it would require a major rewrite of the endpoints controller and
  27. // Informer framework. Such situations, i.e. frequent updates of the same object in a single sync
  28. // period, should be relatively rare and therefore this util should provide a good approximation of
  29. // the EndpointsLastChangeTriggerTime.
  30. // TODO(mm4tt): Implement a more robust mechanism that is not subject to the above limitations.
  31. type TriggerTimeTracker struct {
  32. // endpointsStates is a map, indexed by Endpoints object key, storing the last known Endpoints
  33. // object state observed during the most recent call of the ComputeEndpointsLastChangeTriggerTime
  34. // function.
  35. endpointsStates map[endpointsKey]endpointsState
  36. // mutex guarding the endpointsStates map.
  37. mutex sync.Mutex
  38. }
  39. // NewTriggerTimeTracker creates a new instance of the TriggerTimeTracker.
  40. func NewTriggerTimeTracker() *TriggerTimeTracker {
  41. return &TriggerTimeTracker{
  42. endpointsStates: make(map[endpointsKey]endpointsState),
  43. }
  44. }
  45. // endpointsKey is a key uniquely identifying an Endpoints object.
  46. type endpointsKey struct {
  47. // namespace, name composing a namespaced name - an unique identifier of every Endpoints object.
  48. namespace, name string
  49. }
  50. // endpointsState represents a state of an Endpoints object that is known to this util.
  51. type endpointsState struct {
  52. // lastServiceTriggerTime is a service trigger time observed most recently.
  53. lastServiceTriggerTime time.Time
  54. // lastPodTriggerTimes is a map (Pod name -> time) storing the pod trigger times that were
  55. // observed during the most recent call of the ComputeEndpointsLastChangeTriggerTime function.
  56. lastPodTriggerTimes map[string]time.Time
  57. }
  58. // ComputeEndpointsLastChangeTriggerTime updates the state of the Endpoints object being synced
  59. // and returns the time that should be exported as the EndpointsLastChangeTriggerTime annotation.
  60. //
  61. // If the method returns a 'zero' time the EndpointsLastChangeTriggerTime annotation shouldn't be
  62. // exported.
  63. //
  64. // Please note that this function may compute a wrong EndpointsLastChangeTriggerTime value if the
  65. // same object (pod/service) changes multiple times between two consecutive syncs.
  66. //
  67. // Important: This method is go-routing safe but only when called for different keys. The method
  68. // shouldn't be called concurrently for the same key! This contract is fulfilled in the current
  69. // implementation of the endpoints controller.
  70. func (t *TriggerTimeTracker) ComputeEndpointsLastChangeTriggerTime(
  71. namespace, name string, service *v1.Service, pods []*v1.Pod) time.Time {
  72. key := endpointsKey{namespace: namespace, name: name}
  73. // As there won't be any concurrent calls for the same key, we need to guard access only to the
  74. // endpointsStates map.
  75. t.mutex.Lock()
  76. state, wasKnown := t.endpointsStates[key]
  77. t.mutex.Unlock()
  78. // Update the state before returning.
  79. defer func() {
  80. t.mutex.Lock()
  81. t.endpointsStates[key] = state
  82. t.mutex.Unlock()
  83. }()
  84. // minChangedTriggerTime is the min trigger time of all trigger times that have changed since the
  85. // last sync.
  86. var minChangedTriggerTime time.Time
  87. // TODO(mm4tt): If memory allocation / GC performance impact of recreating map in every call
  88. // turns out to be too expensive, we should consider rewriting this to reuse the existing map.
  89. podTriggerTimes := make(map[string]time.Time)
  90. for _, pod := range pods {
  91. if podTriggerTime := getPodTriggerTime(pod); !podTriggerTime.IsZero() {
  92. podTriggerTimes[pod.Name] = podTriggerTime
  93. if podTriggerTime.After(state.lastPodTriggerTimes[pod.Name]) {
  94. // Pod trigger time has changed since the last sync, update minChangedTriggerTime.
  95. minChangedTriggerTime = min(minChangedTriggerTime, podTriggerTime)
  96. }
  97. }
  98. }
  99. serviceTriggerTime := getServiceTriggerTime(service)
  100. if serviceTriggerTime.After(state.lastServiceTriggerTime) {
  101. // Service trigger time has changed since the last sync, update minChangedTriggerTime.
  102. minChangedTriggerTime = min(minChangedTriggerTime, serviceTriggerTime)
  103. }
  104. state.lastPodTriggerTimes = podTriggerTimes
  105. state.lastServiceTriggerTime = serviceTriggerTime
  106. if !wasKnown {
  107. // New Endpoints object / new Service, use Service creationTimestamp.
  108. return service.CreationTimestamp.Time
  109. } else {
  110. // Regular update of the Endpoints object, return min of changed trigger times.
  111. return minChangedTriggerTime
  112. }
  113. }
  114. // DeleteEndpoints deletes endpoints state stored in this util.
  115. func (t *TriggerTimeTracker) DeleteEndpoints(namespace, name string) {
  116. key := endpointsKey{namespace: namespace, name: name}
  117. t.mutex.Lock()
  118. defer t.mutex.Unlock()
  119. delete(t.endpointsStates, key)
  120. }
  121. // getPodTriggerTime returns the time of the pod change (trigger) that resulted or will result in
  122. // the endpoints object change.
  123. func getPodTriggerTime(pod *v1.Pod) (triggerTime time.Time) {
  124. if readyCondition := podutil.GetPodReadyCondition(pod.Status); readyCondition != nil {
  125. triggerTime = readyCondition.LastTransitionTime.Time
  126. }
  127. // TODO(mm4tt): Implement missing cases: deletionTime set, pod label change
  128. return triggerTime
  129. }
  130. // getServiceTriggerTime returns the time of the service change (trigger) that resulted or will
  131. // result in the endpoints object change.
  132. func getServiceTriggerTime(service *v1.Service) (triggerTime time.Time) {
  133. // TODO(mm4tt): Ideally we should look at service.LastUpdateTime, but such thing doesn't exist.
  134. return service.CreationTimestamp.Time
  135. }
  136. // min returns minimum of the currentMin and newValue or newValue if the currentMin is not set.
  137. func min(currentMin, newValue time.Time) time.Time {
  138. if currentMin.IsZero() || newValue.Before(currentMin) {
  139. return newValue
  140. }
  141. return currentMin
  142. }