|
- package endpointslice
- import (
- "context"
- "fmt"
- "sort"
- "time"
- corev1 "k8s.io/api/core/v1"
- discovery "k8s.io/api/discovery/v1beta1"
- "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
- utilerrors "k8s.io/apimachinery/pkg/util/errors"
- "k8s.io/apimachinery/pkg/util/sets"
- clientset "k8s.io/client-go/kubernetes"
- corelisters "k8s.io/client-go/listers/core/v1"
- "k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
- endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
- )
- type reconciler struct {
- client clientset.Interface
- nodeLister corelisters.NodeLister
- maxEndpointsPerSlice int32
- endpointSliceTracker *endpointSliceTracker
- metricsCache *metrics.Cache
- }
- type endpointMeta struct {
- Ports []discovery.EndpointPort `json:"ports" protobuf:"bytes,2,rep,name=ports"`
- AddressType discovery.AddressType `json:"addressType" protobuf:"bytes,3,rep,name=addressType"`
- }
- func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
- addressType := discovery.AddressTypeIPv4
- if isIPv6Service(service) {
- addressType = discovery.AddressTypeIPv6
- }
- slicesToCreate := []*discovery.EndpointSlice{}
- slicesToUpdate := []*discovery.EndpointSlice{}
- slicesToDelete := []*discovery.EndpointSlice{}
-
- existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
- numExistingEndpoints := 0
- for _, existingSlice := range existingSlices {
- if existingSlice.AddressType == addressType {
- epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
- existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
- numExistingEndpoints += len(existingSlice.Endpoints)
- } else {
- slicesToDelete = append(slicesToDelete, existingSlice)
- }
- }
-
- desiredMetaByPortMap := map[endpointutil.PortMapKey]*endpointMeta{}
- desiredEndpointsByPortMap := map[endpointutil.PortMapKey]endpointSet{}
- numDesiredEndpoints := 0
- for _, pod := range pods {
- if endpointutil.ShouldPodBeInEndpoints(pod) {
- endpointPorts := getEndpointPorts(service, pod)
- epHash := endpointutil.NewPortMapKey(endpointPorts)
- if _, ok := desiredEndpointsByPortMap[epHash]; !ok {
- desiredEndpointsByPortMap[epHash] = endpointSet{}
- }
- if _, ok := desiredMetaByPortMap[epHash]; !ok {
- desiredMetaByPortMap[epHash] = &endpointMeta{
- AddressType: addressType,
- Ports: endpointPorts,
- }
- }
- node, err := r.nodeLister.Get(pod.Spec.NodeName)
- if err != nil {
- return err
- }
- endpoint := podToEndpoint(pod, node, service)
- if len(endpoint.Addresses) > 0 {
- desiredEndpointsByPortMap[epHash].Insert(&endpoint)
- numDesiredEndpoints++
- }
- }
- }
- spMetrics := metrics.NewServicePortCache()
- totalAdded := 0
- totalRemoved := 0
-
- for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
- numEndpoints := len(desiredEndpoints)
- pmSlicesToCreate, pmSlicesToUpdate, pmSlicesToDelete, added, removed := r.reconcileByPortMapping(
- service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])
- totalAdded += added
- totalRemoved += removed
- spMetrics.Set(portMap, metrics.EfficiencyInfo{
- Endpoints: numEndpoints,
- Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSlicesToDelete),
- })
- if len(pmSlicesToCreate) > 0 {
- slicesToCreate = append(slicesToCreate, pmSlicesToCreate...)
- }
- if len(pmSlicesToUpdate) > 0 {
- slicesToUpdate = append(slicesToUpdate, pmSlicesToUpdate...)
- }
- if len(pmSlicesToDelete) > 0 {
- slicesToDelete = append(slicesToDelete, pmSlicesToDelete...)
- }
- }
-
-
- for portMap, existingSlices := range existingSlicesByPortMap {
- if _, ok := desiredEndpointsByPortMap[portMap]; !ok {
- for _, existingSlice := range existingSlices {
- slicesToDelete = append(slicesToDelete, existingSlice)
- }
- }
- }
-
- if len(existingSlices) == len(slicesToDelete) && len(slicesToCreate) < 1 {
- placeholderSlice := newEndpointSlice(service, &endpointMeta{Ports: []discovery.EndpointPort{}, AddressType: addressType})
- slicesToCreate = append(slicesToCreate, placeholderSlice)
- spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
- Endpoints: 0,
- Slices: 1,
- })
- }
- metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
- metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totalRemoved))
- serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
- r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)
- return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
- }
- func (r *reconciler) finalize(
- service *corev1.Service,
- slicesToCreate,
- slicesToUpdate,
- slicesToDelete []*discovery.EndpointSlice,
- triggerTime time.Time,
- ) error {
- errs := []error{}
-
-
- for i := 0; i < len(slicesToDelete); {
- if len(slicesToCreate) == 0 {
- break
- }
- sliceToDelete := slicesToDelete[i]
- slice := slicesToCreate[len(slicesToCreate)-1]
-
-
-
-
-
-
- if sliceToDelete.AddressType == slice.AddressType {
- slice.Name = sliceToDelete.Name
- slicesToCreate = slicesToCreate[:len(slicesToCreate)-1]
- slicesToUpdate = append(slicesToUpdate, slice)
- slicesToDelete = append(slicesToDelete[:i], slicesToDelete[i+1:]...)
- } else {
- i++
- }
- }
- for _, endpointSlice := range slicesToCreate {
- addTriggerTimeAnnotation(endpointSlice, triggerTime)
- createdSlice, err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
- if err != nil {
-
- if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
- return nil
- }
- errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err))
- } else {
- r.endpointSliceTracker.Update(createdSlice)
- metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
- }
- }
- for _, endpointSlice := range slicesToUpdate {
- addTriggerTimeAnnotation(endpointSlice, triggerTime)
- updatedSlice, err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
- if err != nil {
- errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
- } else {
- r.endpointSliceTracker.Update(updatedSlice)
- metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
- }
- }
- for _, endpointSlice := range slicesToDelete {
- err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Delete(context.TODO(), endpointSlice.Name, &metav1.DeleteOptions{})
- if err != nil {
- errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
- } else {
- r.endpointSliceTracker.Delete(endpointSlice)
- metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
- }
- }
- return utilerrors.NewAggregate(errs)
- }
- func (r *reconciler) reconcileByPortMapping(
- service *corev1.Service,
- existingSlices []*discovery.EndpointSlice,
- desiredSet endpointSet,
- endpointMeta *endpointMeta,
- ) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, []*discovery.EndpointSlice, int, int) {
- slicesByName := map[string]*discovery.EndpointSlice{}
- sliceNamesUnchanged := sets.String{}
- sliceNamesToUpdate := sets.String{}
- sliceNamesToDelete := sets.String{}
- numRemoved := 0
-
-
- for _, existingSlice := range existingSlices {
- slicesByName[existingSlice.Name] = existingSlice
- newEndpoints := []discovery.Endpoint{}
- endpointUpdated := false
- for _, endpoint := range existingSlice.Endpoints {
- got := desiredSet.Get(&endpoint)
-
- if got != nil {
- newEndpoints = append(newEndpoints, *got)
-
-
- if !endpointsEqualBeyondHash(got, &endpoint) {
- endpointUpdated = true
- }
-
-
- desiredSet.Delete(&endpoint)
- }
- }
-
- if endpointUpdated || len(existingSlice.Endpoints) != len(newEndpoints) {
- if len(existingSlice.Endpoints) > len(newEndpoints) {
- numRemoved += len(existingSlice.Endpoints) - len(newEndpoints)
- }
- if len(newEndpoints) == 0 {
-
- sliceNamesToDelete.Insert(existingSlice.Name)
- } else {
-
- epSlice := existingSlice.DeepCopy()
- epSlice.Endpoints = newEndpoints
- slicesByName[existingSlice.Name] = epSlice
- sliceNamesToUpdate.Insert(epSlice.Name)
- }
- } else {
-
- sliceNamesUnchanged.Insert(existingSlice.Name)
- }
- }
- numAdded := desiredSet.Len()
-
-
- if desiredSet.Len() > 0 && sliceNamesToUpdate.Len() > 0 {
- slices := []*discovery.EndpointSlice{}
- for _, sliceName := range sliceNamesToUpdate.UnsortedList() {
- slices = append(slices, slicesByName[sliceName])
- }
-
-
- sort.Sort(endpointSliceEndpointLen(slices))
-
- for _, slice := range slices {
- for desiredSet.Len() > 0 && len(slice.Endpoints) < int(r.maxEndpointsPerSlice) {
- endpoint, _ := desiredSet.PopAny()
- slice.Endpoints = append(slice.Endpoints, *endpoint)
- }
- }
- }
-
-
-
- slicesToCreate := []*discovery.EndpointSlice{}
- for desiredSet.Len() > 0 {
- var sliceToFill *discovery.EndpointSlice
-
-
-
- if desiredSet.Len() < int(r.maxEndpointsPerSlice) && sliceNamesUnchanged.Len() > 0 {
- unchangedSlices := []*discovery.EndpointSlice{}
- for _, sliceName := range sliceNamesUnchanged.UnsortedList() {
- unchangedSlices = append(unchangedSlices, slicesByName[sliceName])
- }
- sliceToFill = getSliceToFill(unchangedSlices, desiredSet.Len(), int(r.maxEndpointsPerSlice))
- }
-
- if sliceToFill == nil {
- sliceToFill = newEndpointSlice(service, endpointMeta)
- } else {
-
- sliceToFill = sliceToFill.DeepCopy()
- slicesByName[sliceToFill.Name] = sliceToFill
- }
-
- for desiredSet.Len() > 0 && len(sliceToFill.Endpoints) < int(r.maxEndpointsPerSlice) {
- endpoint, _ := desiredSet.PopAny()
- sliceToFill.Endpoints = append(sliceToFill.Endpoints, *endpoint)
- }
-
-
- if sliceToFill.Name != "" {
- sliceNamesToUpdate.Insert(sliceToFill.Name)
- sliceNamesUnchanged.Delete(sliceToFill.Name)
- } else {
- slicesToCreate = append(slicesToCreate, sliceToFill)
- }
- }
-
- slicesToUpdate := []*discovery.EndpointSlice{}
- for _, sliceName := range sliceNamesToUpdate.UnsortedList() {
- slicesToUpdate = append(slicesToUpdate, slicesByName[sliceName])
- }
-
- slicesToDelete := []*discovery.EndpointSlice{}
- for _, sliceName := range sliceNamesToDelete.UnsortedList() {
- slicesToDelete = append(slicesToDelete, slicesByName[sliceName])
- }
- return slicesToCreate, slicesToUpdate, slicesToDelete, numAdded, numRemoved
- }
- func (r *reconciler) deleteService(namespace, name string) {
- r.metricsCache.DeleteService(types.NamespacedName{Namespace: namespace, Name: name})
- }
|