123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- /*
- Copyright 2019 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package endpoint
- import (
- "crypto/md5"
- "encoding/hex"
- "fmt"
- "reflect"
- "sort"
- "sync"
- v1 "k8s.io/api/core/v1"
- discovery "k8s.io/api/discovery/v1beta1"
- "k8s.io/apimachinery/pkg/labels"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- v1listers "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/util/hash"
- )
- // ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527)
- type ServiceSelectorCache struct {
- lock sync.RWMutex
- cache map[string]labels.Selector
- }
- // NewServiceSelectorCache init ServiceSelectorCache for both endpoint controller and endpointSlice controller.
- func NewServiceSelectorCache() *ServiceSelectorCache {
- return &ServiceSelectorCache{
- cache: map[string]labels.Selector{},
- }
- }
- // Get return selector and existence in ServiceSelectorCache by key.
- func (sc *ServiceSelectorCache) Get(key string) (labels.Selector, bool) {
- sc.lock.RLock()
- selector, ok := sc.cache[key]
- // fine-grained lock improves GetPodServiceMemberships performance(16.5%) than defer measured by BenchmarkGetPodServiceMemberships
- sc.lock.RUnlock()
- return selector, ok
- }
- // Update can update or add a selector in ServiceSelectorCache while service's selector changed.
- func (sc *ServiceSelectorCache) Update(key string, rawSelector map[string]string) labels.Selector {
- sc.lock.Lock()
- defer sc.lock.Unlock()
- selector := labels.Set(rawSelector).AsSelectorPreValidated()
- sc.cache[key] = selector
- return selector
- }
- // Delete can delete selector which exist in ServiceSelectorCache.
- func (sc *ServiceSelectorCache) Delete(key string) {
- sc.lock.Lock()
- defer sc.lock.Unlock()
- delete(sc.cache, key)
- }
- // GetPodServiceMemberships returns a set of Service keys for Services that have
- // a selector matching the given pod.
- func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
- set := sets.String{}
- services, err := serviceLister.Services(pod.Namespace).List(labels.Everything())
- if err != nil {
- return set, err
- }
- var selector labels.Selector
- for _, service := range services {
- if service.Spec.Selector == nil {
- // if the service has a nil selector this means selectors match nothing, not everything.
- continue
- }
- key, err := controller.KeyFunc(service)
- if err != nil {
- return nil, err
- }
- if v, ok := sc.Get(key); ok {
- selector = v
- } else {
- selector = sc.Update(key, service.Spec.Selector)
- }
- if selector.Matches(labels.Set(pod.Labels)) {
- set.Insert(key)
- }
- }
- return set, nil
- }
- // EndpointsMatch is a type of function that returns true if pod endpoints match.
- type EndpointsMatch func(*v1.Pod, *v1.Pod) bool
- // PortMapKey is used to uniquely identify groups of endpoint ports.
- type PortMapKey string
- // NewPortMapKey generates a PortMapKey from endpoint ports.
- func NewPortMapKey(endpointPorts []discovery.EndpointPort) PortMapKey {
- sort.Sort(portsInOrder(endpointPorts))
- return PortMapKey(DeepHashObjectToString(endpointPorts))
- }
- // DeepHashObjectToString creates a unique hash string from a go object.
- func DeepHashObjectToString(objectToWrite interface{}) string {
- hasher := md5.New()
- hash.DeepHashObject(hasher, objectToWrite)
- return hex.EncodeToString(hasher.Sum(nil)[0:])
- }
- // ShouldPodBeInEndpoints returns true if a specified pod should be in an
- // endpoints object.
- func ShouldPodBeInEndpoints(pod *v1.Pod) bool {
- if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
- return false
- }
- if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
- return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
- }
- if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
- return pod.Status.Phase != v1.PodSucceeded
- }
- return true
- }
- // ShouldSetHostname returns true if the Hostname attribute should be set on an
- // Endpoints Address or EndpointSlice Endpoint.
- func ShouldSetHostname(pod *v1.Pod, svc *v1.Service) bool {
- return len(pod.Spec.Hostname) > 0 && pod.Spec.Subdomain == svc.Name && svc.Namespace == pod.Namespace
- }
- // PodChanged returns two boolean values, the first returns true if the pod.
- // has changed, the second value returns true if the pod labels have changed.
- func PodChanged(oldPod, newPod *v1.Pod, endpointChanged EndpointsMatch) (bool, bool) {
- // Check if the pod labels have changed, indicating a possible
- // change in the service membership
- labelsChanged := false
- if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
- !hostNameAndDomainAreEqual(newPod, oldPod) {
- labelsChanged = true
- }
- // If the pod's deletion timestamp is set, remove endpoint from ready address.
- if newPod.DeletionTimestamp != oldPod.DeletionTimestamp {
- return true, labelsChanged
- }
- // If the pod's readiness has changed, the associated endpoint address
- // will move from the unready endpoints set to the ready endpoints.
- // So for the purposes of an endpoint, a readiness change on a pod
- // means we have a changed pod.
- if podutil.IsPodReady(oldPod) != podutil.IsPodReady(newPod) {
- return true, labelsChanged
- }
- // Convert the pod to an Endpoint, clear inert fields,
- // and see if they are the same.
- // TODO: Add a watcher for node changes separate from this
- // We don't want to trigger multiple syncs at a pod level when a node changes
- return endpointChanged(newPod, oldPod), labelsChanged
- }
- // GetServicesToUpdateOnPodChange returns a set of Service keys for Services
- // that have potentially been affected by a change to this pod.
- func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}, endpointChanged EndpointsMatch) sets.String {
- newPod := cur.(*v1.Pod)
- oldPod := old.(*v1.Pod)
- if newPod.ResourceVersion == oldPod.ResourceVersion {
- // Periodic resync will send update events for all known pods.
- // Two different versions of the same pod will always have different RVs
- return sets.String{}
- }
- podChanged, labelsChanged := PodChanged(oldPod, newPod, endpointChanged)
- // If both the pod and labels are unchanged, no update is needed
- if !podChanged && !labelsChanged {
- return sets.String{}
- }
- services, err := selectorCache.GetPodServiceMemberships(serviceLister, newPod)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
- return sets.String{}
- }
- if labelsChanged {
- oldServices, err := selectorCache.GetPodServiceMemberships(serviceLister, oldPod)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
- }
- services = determineNeededServiceUpdates(oldServices, services, podChanged)
- }
- return services
- }
- // GetPodFromDeleteAction returns a pointer to a pod if one can be derived from
- // obj (could be a *v1.Pod, or a DeletionFinalStateUnknown marker item).
- func GetPodFromDeleteAction(obj interface{}) *v1.Pod {
- if pod, ok := obj.(*v1.Pod); ok {
- // Enqueue all the services that the pod used to be a member of.
- // This is the same thing we do when we add a pod.
- return pod
- }
- // If we reached here it means the pod was deleted but its final state is unrecorded.
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
- return nil
- }
- pod, ok := tombstone.Obj.(*v1.Pod)
- if !ok {
- utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
- return nil
- }
- return pod
- }
- func hostNameAndDomainAreEqual(pod1, pod2 *v1.Pod) bool {
- return pod1.Spec.Hostname == pod2.Spec.Hostname &&
- pod1.Spec.Subdomain == pod2.Spec.Subdomain
- }
- func determineNeededServiceUpdates(oldServices, services sets.String, podChanged bool) sets.String {
- if podChanged {
- // if the labels and pod changed, all services need to be updated
- services = services.Union(oldServices)
- } else {
- // if only the labels changed, services not common to both the new
- // and old service set (the disjuntive union) need to be updated
- services = services.Difference(oldServices).Union(oldServices.Difference(services))
- }
- return services
- }
- // portsInOrder helps sort endpoint ports in a consistent way for hashing.
- type portsInOrder []discovery.EndpointPort
- func (sl portsInOrder) Len() int { return len(sl) }
- func (sl portsInOrder) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
- func (sl portsInOrder) Less(i, j int) bool {
- h1 := DeepHashObjectToString(sl[i])
- h2 := DeepHashObjectToString(sl[j])
- return h1 < h2
- }
|