controller_utils.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpoint
  14. import (
  15. "crypto/md5"
  16. "encoding/hex"
  17. "fmt"
  18. "reflect"
  19. "sort"
  20. "sync"
  21. v1 "k8s.io/api/core/v1"
  22. discovery "k8s.io/api/discovery/v1beta1"
  23. "k8s.io/apimachinery/pkg/labels"
  24. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  25. "k8s.io/apimachinery/pkg/util/sets"
  26. v1listers "k8s.io/client-go/listers/core/v1"
  27. "k8s.io/client-go/tools/cache"
  28. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  29. "k8s.io/kubernetes/pkg/controller"
  30. "k8s.io/kubernetes/pkg/util/hash"
  31. )
  32. // ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527)
  33. type ServiceSelectorCache struct {
  34. lock sync.RWMutex
  35. cache map[string]labels.Selector
  36. }
  37. // NewServiceSelectorCache init ServiceSelectorCache for both endpoint controller and endpointSlice controller.
  38. func NewServiceSelectorCache() *ServiceSelectorCache {
  39. return &ServiceSelectorCache{
  40. cache: map[string]labels.Selector{},
  41. }
  42. }
  43. // Get return selector and existence in ServiceSelectorCache by key.
  44. func (sc *ServiceSelectorCache) Get(key string) (labels.Selector, bool) {
  45. sc.lock.RLock()
  46. selector, ok := sc.cache[key]
  47. // fine-grained lock improves GetPodServiceMemberships performance(16.5%) than defer measured by BenchmarkGetPodServiceMemberships
  48. sc.lock.RUnlock()
  49. return selector, ok
  50. }
  51. // Update can update or add a selector in ServiceSelectorCache while service's selector changed.
  52. func (sc *ServiceSelectorCache) Update(key string, rawSelector map[string]string) labels.Selector {
  53. sc.lock.Lock()
  54. defer sc.lock.Unlock()
  55. selector := labels.Set(rawSelector).AsSelectorPreValidated()
  56. sc.cache[key] = selector
  57. return selector
  58. }
  59. // Delete can delete selector which exist in ServiceSelectorCache.
  60. func (sc *ServiceSelectorCache) Delete(key string) {
  61. sc.lock.Lock()
  62. defer sc.lock.Unlock()
  63. delete(sc.cache, key)
  64. }
  65. // GetPodServiceMemberships returns a set of Service keys for Services that have
  66. // a selector matching the given pod.
  67. func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
  68. set := sets.String{}
  69. services, err := serviceLister.Services(pod.Namespace).List(labels.Everything())
  70. if err != nil {
  71. return set, err
  72. }
  73. var selector labels.Selector
  74. for _, service := range services {
  75. if service.Spec.Selector == nil {
  76. // if the service has a nil selector this means selectors match nothing, not everything.
  77. continue
  78. }
  79. key, err := controller.KeyFunc(service)
  80. if err != nil {
  81. return nil, err
  82. }
  83. if v, ok := sc.Get(key); ok {
  84. selector = v
  85. } else {
  86. selector = sc.Update(key, service.Spec.Selector)
  87. }
  88. if selector.Matches(labels.Set(pod.Labels)) {
  89. set.Insert(key)
  90. }
  91. }
  92. return set, nil
  93. }
  94. // EndpointsMatch is a type of function that returns true if pod endpoints match.
  95. type EndpointsMatch func(*v1.Pod, *v1.Pod) bool
  96. // PortMapKey is used to uniquely identify groups of endpoint ports.
  97. type PortMapKey string
  98. // NewPortMapKey generates a PortMapKey from endpoint ports.
  99. func NewPortMapKey(endpointPorts []discovery.EndpointPort) PortMapKey {
  100. sort.Sort(portsInOrder(endpointPorts))
  101. return PortMapKey(DeepHashObjectToString(endpointPorts))
  102. }
  103. // DeepHashObjectToString creates a unique hash string from a go object.
  104. func DeepHashObjectToString(objectToWrite interface{}) string {
  105. hasher := md5.New()
  106. hash.DeepHashObject(hasher, objectToWrite)
  107. return hex.EncodeToString(hasher.Sum(nil)[0:])
  108. }
  109. // ShouldPodBeInEndpoints returns true if a specified pod should be in an
  110. // endpoints object.
  111. func ShouldPodBeInEndpoints(pod *v1.Pod) bool {
  112. if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
  113. return false
  114. }
  115. if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
  116. return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
  117. }
  118. if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
  119. return pod.Status.Phase != v1.PodSucceeded
  120. }
  121. return true
  122. }
  123. // ShouldSetHostname returns true if the Hostname attribute should be set on an
  124. // Endpoints Address or EndpointSlice Endpoint.
  125. func ShouldSetHostname(pod *v1.Pod, svc *v1.Service) bool {
  126. return len(pod.Spec.Hostname) > 0 && pod.Spec.Subdomain == svc.Name && svc.Namespace == pod.Namespace
  127. }
  128. // PodChanged returns two boolean values, the first returns true if the pod.
  129. // has changed, the second value returns true if the pod labels have changed.
  130. func PodChanged(oldPod, newPod *v1.Pod, endpointChanged EndpointsMatch) (bool, bool) {
  131. // Check if the pod labels have changed, indicating a possible
  132. // change in the service membership
  133. labelsChanged := false
  134. if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
  135. !hostNameAndDomainAreEqual(newPod, oldPod) {
  136. labelsChanged = true
  137. }
  138. // If the pod's deletion timestamp is set, remove endpoint from ready address.
  139. if newPod.DeletionTimestamp != oldPod.DeletionTimestamp {
  140. return true, labelsChanged
  141. }
  142. // If the pod's readiness has changed, the associated endpoint address
  143. // will move from the unready endpoints set to the ready endpoints.
  144. // So for the purposes of an endpoint, a readiness change on a pod
  145. // means we have a changed pod.
  146. if podutil.IsPodReady(oldPod) != podutil.IsPodReady(newPod) {
  147. return true, labelsChanged
  148. }
  149. // Convert the pod to an Endpoint, clear inert fields,
  150. // and see if they are the same.
  151. // TODO: Add a watcher for node changes separate from this
  152. // We don't want to trigger multiple syncs at a pod level when a node changes
  153. return endpointChanged(newPod, oldPod), labelsChanged
  154. }
  155. // GetServicesToUpdateOnPodChange returns a set of Service keys for Services
  156. // that have potentially been affected by a change to this pod.
  157. func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}, endpointChanged EndpointsMatch) sets.String {
  158. newPod := cur.(*v1.Pod)
  159. oldPod := old.(*v1.Pod)
  160. if newPod.ResourceVersion == oldPod.ResourceVersion {
  161. // Periodic resync will send update events for all known pods.
  162. // Two different versions of the same pod will always have different RVs
  163. return sets.String{}
  164. }
  165. podChanged, labelsChanged := PodChanged(oldPod, newPod, endpointChanged)
  166. // If both the pod and labels are unchanged, no update is needed
  167. if !podChanged && !labelsChanged {
  168. return sets.String{}
  169. }
  170. services, err := selectorCache.GetPodServiceMemberships(serviceLister, newPod)
  171. if err != nil {
  172. utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
  173. return sets.String{}
  174. }
  175. if labelsChanged {
  176. oldServices, err := selectorCache.GetPodServiceMemberships(serviceLister, oldPod)
  177. if err != nil {
  178. utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
  179. }
  180. services = determineNeededServiceUpdates(oldServices, services, podChanged)
  181. }
  182. return services
  183. }
  184. // GetPodFromDeleteAction returns a pointer to a pod if one can be derived from
  185. // obj (could be a *v1.Pod, or a DeletionFinalStateUnknown marker item).
  186. func GetPodFromDeleteAction(obj interface{}) *v1.Pod {
  187. if pod, ok := obj.(*v1.Pod); ok {
  188. // Enqueue all the services that the pod used to be a member of.
  189. // This is the same thing we do when we add a pod.
  190. return pod
  191. }
  192. // If we reached here it means the pod was deleted but its final state is unrecorded.
  193. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  194. if !ok {
  195. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  196. return nil
  197. }
  198. pod, ok := tombstone.Obj.(*v1.Pod)
  199. if !ok {
  200. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
  201. return nil
  202. }
  203. return pod
  204. }
  205. func hostNameAndDomainAreEqual(pod1, pod2 *v1.Pod) bool {
  206. return pod1.Spec.Hostname == pod2.Spec.Hostname &&
  207. pod1.Spec.Subdomain == pod2.Spec.Subdomain
  208. }
  209. func determineNeededServiceUpdates(oldServices, services sets.String, podChanged bool) sets.String {
  210. if podChanged {
  211. // if the labels and pod changed, all services need to be updated
  212. services = services.Union(oldServices)
  213. } else {
  214. // if only the labels changed, services not common to both the new
  215. // and old service set (the disjuntive union) need to be updated
  216. services = services.Difference(oldServices).Union(oldServices.Difference(services))
  217. }
  218. return services
  219. }
  220. // portsInOrder helps sort endpoint ports in a consistent way for hashing.
  221. type portsInOrder []discovery.EndpointPort
  222. func (sl portsInOrder) Len() int { return len(sl) }
  223. func (sl portsInOrder) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
  224. func (sl portsInOrder) Less(i, j int) bool {
  225. h1 := DeepHashObjectToString(sl[i])
  226. h2 := DeepHashObjectToString(sl[j])
  227. return h1 < h2
  228. }