123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966 |
- /*
- Copyright 2019 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package endpointslice
- import (
- "context"
- "fmt"
- "reflect"
- "strings"
- "testing"
- "time"
- dto "github.com/prometheus/client_model/go"
- "github.com/stretchr/testify/assert"
- corev1 "k8s.io/api/core/v1"
- discovery "k8s.io/api/discovery/v1beta1"
- apiequality "k8s.io/apimachinery/pkg/api/equality"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/intstr"
- "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes/fake"
- corelisters "k8s.io/client-go/listers/core/v1"
- k8stesting "k8s.io/client-go/testing"
- compmetrics "k8s.io/component-base/metrics"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
- utilpointer "k8s.io/utils/pointer"
- )
- var defaultMaxEndpointsPerSlice = int32(100)
- // Even when there are no pods, we want to have a placeholder slice for each service
- func TestReconcileEmpty(t *testing.T) {
- client := newClientset()
- setupMetrics()
- namespace := "test"
- svc, _ := newServiceAndEndpointMeta("foo", namespace)
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, []*corev1.Pod{}, []*discovery.EndpointSlice{}, time.Now())
- expectActions(t, client.Actions(), 1, "create", "endpointslices")
- slices := fetchEndpointSlices(t, client, namespace)
- assert.Len(t, slices, 1, "Expected 1 endpoint slices")
- assert.Regexp(t, "^"+svc.Name, slices[0].Name)
- assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName])
- assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports)
- assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints)
- expectTrackedResourceVersion(t, r.endpointSliceTracker, &slices[0], "100")
- expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
- }
- // Given a single pod matching a service selector and no existing endpoint slices,
- // a slice should be created
- func TestReconcile1Pod(t *testing.T) {
- namespace := "test"
- ipv6Family := corev1.IPv6Protocol
- svcv4, _ := newServiceAndEndpointMeta("foo", namespace)
- svcv6, _ := newServiceAndEndpointMeta("foo", namespace)
- svcv6.Spec.IPFamily = &ipv6Family
- svcv6ClusterIP, _ := newServiceAndEndpointMeta("foo", namespace)
- svcv6ClusterIP.Spec.ClusterIP = "1234::5678:0000:0000:9abc:def1"
- pod1 := newPod(1, namespace, true, 1)
- pod1.Status.PodIPs = []corev1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}}
- pod1.Spec.Hostname = "example-hostname"
- node1 := &corev1.Node{
- ObjectMeta: metav1.ObjectMeta{
- Name: pod1.Spec.NodeName,
- Labels: map[string]string{
- "topology.kubernetes.io/zone": "us-central1-a",
- "topology.kubernetes.io/region": "us-central1",
- },
- },
- }
- testCases := map[string]struct {
- service corev1.Service
- expectedAddressType discovery.AddressType
- expectedEndpoint discovery.Endpoint
- }{
- "ipv4": {
- service: svcv4,
- expectedAddressType: discovery.AddressTypeIPv4,
- expectedEndpoint: discovery.Endpoint{
- Addresses: []string{"1.2.3.4"},
- Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
- Topology: map[string]string{
- "kubernetes.io/hostname": "node-1",
- "topology.kubernetes.io/zone": "us-central1-a",
- "topology.kubernetes.io/region": "us-central1",
- },
- TargetRef: &corev1.ObjectReference{
- Kind: "Pod",
- Namespace: namespace,
- Name: "pod1",
- },
- },
- },
- "ipv6": {
- service: svcv6,
- expectedAddressType: discovery.AddressTypeIPv6,
- expectedEndpoint: discovery.Endpoint{
- Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
- Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
- Topology: map[string]string{
- "kubernetes.io/hostname": "node-1",
- "topology.kubernetes.io/zone": "us-central1-a",
- "topology.kubernetes.io/region": "us-central1",
- },
- TargetRef: &corev1.ObjectReference{
- Kind: "Pod",
- Namespace: namespace,
- Name: "pod1",
- },
- },
- },
- "ipv6-clusterip": {
- service: svcv6ClusterIP,
- expectedAddressType: discovery.AddressTypeIPv6,
- expectedEndpoint: discovery.Endpoint{
- Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
- Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
- Topology: map[string]string{
- "kubernetes.io/hostname": "node-1",
- "topology.kubernetes.io/zone": "us-central1-a",
- "topology.kubernetes.io/region": "us-central1",
- },
- TargetRef: &corev1.ObjectReference{
- Kind: "Pod",
- Namespace: namespace,
- Name: "pod1",
- },
- },
- },
- }
- for name, testCase := range testCases {
- t.Run(name, func(t *testing.T) {
- client := newClientset()
- setupMetrics()
- triggerTime := time.Now()
- r := newReconciler(client, []*corev1.Node{node1}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &testCase.service, []*corev1.Pod{pod1}, []*discovery.EndpointSlice{}, triggerTime)
- if len(client.Actions()) != 1 {
- t.Errorf("Expected 1 clientset action, got %d", len(client.Actions()))
- }
- slices := fetchEndpointSlices(t, client, namespace)
- if len(slices) != 1 {
- t.Fatalf("Expected 1 EndpointSlice, got %d", len(slices))
- }
- slice := slices[0]
- if !strings.HasPrefix(slice.Name, testCase.service.Name) {
- t.Errorf("Expected EndpointSlice name to start with %s, got %s", testCase.service.Name, slice.Name)
- }
- if slice.Labels[discovery.LabelServiceName] != testCase.service.Name {
- t.Errorf("Expected EndpointSlice to have label set with %s value, got %s", testCase.service.Name, slice.Labels[discovery.LabelServiceName])
- }
- if slice.Annotations[corev1.EndpointsLastChangeTriggerTime] != triggerTime.Format(time.RFC3339Nano) {
- t.Errorf("Expected EndpointSlice trigger time annotation to be %s, got %s", triggerTime.Format(time.RFC3339Nano), slice.Annotations[corev1.EndpointsLastChangeTriggerTime])
- }
- if len(slice.Endpoints) != 1 {
- t.Fatalf("Expected 1 Endpoint, got %d", len(slice.Endpoints))
- }
- endpoint := slice.Endpoints[0]
- if !reflect.DeepEqual(endpoint, testCase.expectedEndpoint) {
- t.Errorf("Expected endpoint: %+v, got: %+v", testCase.expectedEndpoint, endpoint)
- }
- expectTrackedResourceVersion(t, r.endpointSliceTracker, &slice, "100")
- expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
- })
- }
- }
- // given an existing endpoint slice and no pods matching the service, the existing
- // slice should be updated to a placeholder (not deleted)
- func TestReconcile1EndpointSlice(t *testing.T) {
- client := newClientset()
- setupMetrics()
- namespace := "test"
- svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
- endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
- _, createErr := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), endpointSlice1, metav1.CreateOptions{})
- assert.Nil(t, createErr, "Expected no error creating endpoint slice")
- numActionsBefore := len(client.Actions())
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, []*corev1.Pod{}, []*discovery.EndpointSlice{endpointSlice1}, time.Now())
- assert.Len(t, client.Actions(), numActionsBefore+1, "Expected 1 additional clientset action")
- actions := client.Actions()
- assert.True(t, actions[numActionsBefore].Matches("update", "endpointslices"), "Action should be update endpoint slice")
- slices := fetchEndpointSlices(t, client, namespace)
- assert.Len(t, slices, 1, "Expected 1 endpoint slices")
- assert.Regexp(t, "^"+svc.Name, slices[0].Name)
- assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName])
- assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports)
- assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints)
- expectTrackedResourceVersion(t, r.endpointSliceTracker, &slices[0], "200")
- expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0})
- }
- // when a Service has PublishNotReadyAddresses set to true, corresponding
- // Endpoints should be considered ready, even if the backing Pod is not.
- func TestReconcile1EndpointSlicePublishNotReadyAddresses(t *testing.T) {
- client := newClientset()
- namespace := "test"
- svc, _ := newServiceAndEndpointMeta("foo", namespace)
- svc.Spec.PublishNotReadyAddresses = true
- // start with 50 pods, 1/3 not ready
- pods := []*corev1.Pod{}
- for i := 0; i < 50; i++ {
- ready := !(i%3 == 0)
- pods = append(pods, newPod(i, namespace, ready, 1))
- }
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
- // Only 1 action, an EndpointSlice create
- assert.Len(t, client.Actions(), 1, "Expected 1 additional clientset action")
- expectActions(t, client.Actions(), 1, "create", "endpointslices")
- // Two endpoint slices should be completely full, the remainder should be in another one
- endpointSlices := fetchEndpointSlices(t, client, namespace)
- for _, endpointSlice := range endpointSlices {
- for i, endpoint := range endpointSlice.Endpoints {
- if !*endpoint.Conditions.Ready {
- t.Errorf("Expected endpoints[%d] to be ready", i)
- }
- }
- }
- expectUnorderedSlicesWithLengths(t, endpointSlices, []int{50})
- }
- // a simple use case with 250 pods matching a service and no existing slices
- // reconcile should create 3 slices, completely filling 2 of them
- func TestReconcileManyPods(t *testing.T) {
- client := newClientset()
- setupMetrics()
- namespace := "test"
- svc, _ := newServiceAndEndpointMeta("foo", namespace)
- // start with 250 pods
- pods := []*corev1.Pod{}
- for i := 0; i < 250; i++ {
- ready := !(i%3 == 0)
- pods = append(pods, newPod(i, namespace, ready, 1))
- }
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
- // This is an ideal scenario where only 3 actions are required, and they're all creates
- assert.Len(t, client.Actions(), 3, "Expected 3 additional clientset actions")
- expectActions(t, client.Actions(), 3, "create", "endpointslices")
- // Two endpoint slices should be completely full, the remainder should be in another one
- expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50})
- expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 250, removedPerSync: 0, numCreated: 3, numUpdated: 0, numDeleted: 0})
- }
- // now with preexisting slices, we have 250 pods matching a service
- // the first endpoint slice contains 62 endpoints, all desired
- // the second endpoint slice contains 61 endpoints, all desired
- // that leaves 127 to add
- // to minimize writes, our strategy is to create new slices for multiples of 100
- // that leaves 27 to drop in an existing slice
- // dropping them in the first slice will result in the slice being closest to full
- // this approach requires 1 update + 1 create instead of 2 updates + 1 create
- func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
- client := newClientset()
- setupMetrics()
- namespace := "test"
- svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
- // start with 250 pods
- pods := []*corev1.Pod{}
- for i := 0; i < 250; i++ {
- ready := !(i%3 == 0)
- pods = append(pods, newPod(i, namespace, ready, 1))
- }
- // have approximately 1/4 in first slice
- endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
- for i := 1; i < len(pods)-4; i += 4 {
- endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
- }
- // have approximately 1/4 in second slice
- endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
- for i := 3; i < len(pods)-4; i += 4 {
- endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
- }
- existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
- cmc := newCacheMutationCheck(existingSlices)
- createEndpointSlices(t, client, namespace, existingSlices)
- numActionsBefore := len(client.Actions())
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
- actions := client.Actions()
- assert.Equal(t, numActionsBefore+2, len(actions), "Expected 2 additional client actions as part of reconcile")
- assert.True(t, actions[numActionsBefore].Matches("create", "endpointslices"), "First action should be create endpoint slice")
- assert.True(t, actions[numActionsBefore+1].Matches("update", "endpointslices"), "Second action should be update endpoint slice")
- // 1 new slice (0->100) + 1 updated slice (62->89)
- expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100})
- expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0})
- // ensure cache mutation has not occurred
- cmc.Check(t)
- }
- // now with preexisting slices, we have 300 pods matching a service
- // this scenario will show some less ideal allocation
- // the first endpoint slice contains 74 endpoints, all desired
- // the second endpoint slice contains 74 endpoints, all desired
- // that leaves 152 to add
- // to minimize writes, our strategy is to create new slices for multiples of 100
- // that leaves 52 to drop in an existing slice
- // that capacity could fit if split in the 2 existing slices
- // to minimize writes though, reconcile create a new slice with those 52 endpoints
- // this approach requires 2 creates instead of 2 updates + 1 create
- func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
- client := newClientset()
- setupMetrics()
- namespace := "test"
- svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
- // start with 300 pods
- pods := []*corev1.Pod{}
- for i := 0; i < 300; i++ {
- ready := !(i%3 == 0)
- pods = append(pods, newPod(i, namespace, ready, 1))
- }
- // have approximately 1/4 in first slice
- endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
- for i := 1; i < len(pods)-4; i += 4 {
- endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
- }
- // have approximately 1/4 in second slice
- endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
- for i := 3; i < len(pods)-4; i += 4 {
- endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, &svc))
- }
- existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
- cmc := newCacheMutationCheck(existingSlices)
- createEndpointSlices(t, client, namespace, existingSlices)
- numActionsBefore := len(client.Actions())
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
- actions := client.Actions()
- assert.Equal(t, numActionsBefore+2, len(actions), "Expected 2 additional client actions as part of reconcile")
- expectActions(t, client.Actions(), 2, "create", "endpointslices")
- // 2 new slices (100, 52) in addition to existing slices (74, 74)
- expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52})
- expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0})
- // ensure cache mutation has not occurred
- cmc.Check(t)
- }
- // In some cases, such as a service port change, all slices for that service will require a change
- // This test ensures that we are updating those slices and not calling create + delete for each
- func TestReconcileEndpointSlicesUpdating(t *testing.T) {
- client := newClientset()
- namespace := "test"
- svc, _ := newServiceAndEndpointMeta("foo", namespace)
- // start with 250 pods
- pods := []*corev1.Pod{}
- for i := 0; i < 250; i++ {
- ready := !(i%3 == 0)
- pods = append(pods, newPod(i, namespace, ready, 1))
- }
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
- numActionsExpected := 3
- assert.Len(t, client.Actions(), numActionsExpected, "Expected 3 additional clientset actions")
- slices := fetchEndpointSlices(t, client, namespace)
- numActionsExpected++
- expectUnorderedSlicesWithLengths(t, slices, []int{100, 100, 50})
- svc.Spec.Ports[0].TargetPort.IntVal = 81
- reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{&slices[0], &slices[1], &slices[2]}, time.Now())
- numActionsExpected += 3
- assert.Len(t, client.Actions(), numActionsExpected, "Expected 3 additional clientset actions")
- expectActions(t, client.Actions(), 3, "update", "endpointslices")
- expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50})
- }
- // In this test, we start with 10 slices that only have 30 endpoints each
- // An initial reconcile makes no changes (as desired to limit writes)
- // When we change a service port, all slices will need to be updated in some way
- // reconcile repacks the endpoints into 3 slices, and deletes the extras
- func TestReconcileEndpointSlicesRecycling(t *testing.T) {
- client := newClientset()
- setupMetrics()
- namespace := "test"
- svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
- // start with 300 pods
- pods := []*corev1.Pod{}
- for i := 0; i < 300; i++ {
- ready := !(i%3 == 0)
- pods = append(pods, newPod(i, namespace, ready, 1))
- }
- // generate 10 existing slices with 30 pods/endpoints each
- existingSlices := []*discovery.EndpointSlice{}
- for i, pod := range pods {
- sliceNum := i / 30
- if i%30 == 0 {
- existingSlices = append(existingSlices, newEmptyEndpointSlice(sliceNum, namespace, endpointMeta, svc))
- }
- existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
- }
- cmc := newCacheMutationCheck(existingSlices)
- createEndpointSlices(t, client, namespace, existingSlices)
- numActionsBefore := len(client.Actions())
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
- // initial reconcile should be a no op, all pods are accounted for in slices, no repacking should be done
- assert.Equal(t, numActionsBefore+0, len(client.Actions()), "Expected 0 additional client actions as part of reconcile")
- // changing a service port should require all slices to be updated, time for a repack
- svc.Spec.Ports[0].TargetPort.IntVal = 81
- reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
- // this should reflect 3 updates + 7 deletes
- assert.Equal(t, numActionsBefore+10, len(client.Actions()), "Expected 10 additional client actions as part of reconcile")
- // thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices
- expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100})
- expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7})
- // ensure cache mutation has not occurred
- cmc.Check(t)
- }
- // In this test, we want to verify that endpoints are added to a slice that will
- // be closest to full after the operation, even when slices are already marked
- // for update.
- func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
- client := newClientset()
- setupMetrics()
- namespace := "test"
- svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
- existingSlices := []*discovery.EndpointSlice{}
- pods := []*corev1.Pod{}
- slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
- for i := 0; i < 80; i++ {
- pod := newPod(i, namespace, true, 1)
- slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
- pods = append(pods, pod)
- }
- existingSlices = append(existingSlices, slice1)
- slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
- for i := 100; i < 120; i++ {
- pod := newPod(i, namespace, true, 1)
- slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
- pods = append(pods, pod)
- }
- existingSlices = append(existingSlices, slice2)
- cmc := newCacheMutationCheck(existingSlices)
- createEndpointSlices(t, client, namespace, existingSlices)
- // ensure that endpoints in each slice will be marked for update.
- for i, pod := range pods {
- if i%10 == 0 {
- pod.Status.Conditions = []corev1.PodCondition{{
- Type: corev1.PodReady,
- Status: corev1.ConditionFalse,
- }}
- }
- }
- // add a few additional endpoints - no more than could fit in either slice.
- for i := 200; i < 215; i++ {
- pods = append(pods, newPod(i, namespace, true, 1))
- }
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
- // ensure that both endpoint slices have been updated
- expectActions(t, client.Actions(), 2, "update", "endpointslices")
- expectMetrics(t, expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 115, addedPerSync: 15, removedPerSync: 0, numCreated: 0, numUpdated: 2, numDeleted: 0})
- // additional pods should get added to fuller slice
- expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20})
- // ensure cache mutation has not occurred
- cmc.Check(t)
- }
- // In this test, we want to verify that old EndpointSlices with a deprecated IP
- // address type will be replaced with a newer IPv4 type.
- func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
- client := newClientset()
- setupMetrics()
- namespace := "test"
- svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
- endpointMeta.AddressType = discovery.AddressTypeIP
- existingSlices := []*discovery.EndpointSlice{}
- pods := []*corev1.Pod{}
- slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
- for i := 0; i < 80; i++ {
- pod := newPod(i, namespace, true, 1)
- slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}))
- pods = append(pods, pod)
- }
- existingSlices = append(existingSlices, slice1)
- slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
- for i := 100; i < 150; i++ {
- pod := newPod(i, namespace, true, 1)
- slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}))
- pods = append(pods, pod)
- }
- existingSlices = append(existingSlices, slice2)
- createEndpointSlices(t, client, namespace, existingSlices)
- cmc := newCacheMutationCheck(existingSlices)
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
- // ensure that both original endpoint slices have been deleted
- expectActions(t, client.Actions(), 2, "delete", "endpointslices")
- endpointSlices := fetchEndpointSlices(t, client, namespace)
- // since this involved replacing both EndpointSlices, the result should be
- // perfectly packed.
- expectUnorderedSlicesWithLengths(t, endpointSlices, []int{100, 30})
- for _, endpointSlice := range endpointSlices {
- if endpointSlice.AddressType != discovery.AddressTypeIPv4 {
- t.Errorf("Expected address type to be IPv4, got %s", endpointSlice.AddressType)
- }
- }
- // ensure cache mutation has not occurred
- cmc.Check(t)
- }
- // Named ports can map to different port numbers on different pods.
- // This test ensures that EndpointSlices are grouped correctly in that case.
- func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
- client := newClientset()
- setupMetrics()
- namespace := "test"
- portNameIntStr := intstr.IntOrString{
- Type: intstr.String,
- StrVal: "http",
- }
- svc := corev1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "named-port-example", Namespace: namespace},
- Spec: corev1.ServiceSpec{
- Ports: []corev1.ServicePort{{
- TargetPort: portNameIntStr,
- Protocol: corev1.ProtocolTCP,
- }},
- Selector: map[string]string{"foo": "bar"},
- },
- }
- // start with 300 pods
- pods := []*corev1.Pod{}
- for i := 0; i < 300; i++ {
- ready := !(i%3 == 0)
- portOffset := i % 5
- pod := newPod(i, namespace, ready, 1)
- pod.Spec.Containers[0].Ports = []corev1.ContainerPort{{
- Name: portNameIntStr.StrVal,
- ContainerPort: int32(8080 + portOffset),
- Protocol: corev1.ProtocolTCP,
- }}
- pods = append(pods, pod)
- }
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
- // reconcile should create 5 endpoint slices
- assert.Equal(t, 5, len(client.Actions()), "Expected 5 client actions as part of reconcile")
- expectActions(t, client.Actions(), 5, "create", "endpointslices")
- expectMetrics(t, expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 5, numUpdated: 0, numDeleted: 0})
- fetchedSlices := fetchEndpointSlices(t, client, namespace)
- // each slice should have 60 endpoints to match 5 unique variations of named port mapping
- expectUnorderedSlicesWithLengths(t, fetchedSlices, []int{60, 60, 60, 60, 60})
- // generate data structures for expected slice ports and address types
- protoTCP := corev1.ProtocolTCP
- expectedSlices := []discovery.EndpointSlice{}
- for i := range fetchedSlices {
- expectedSlices = append(expectedSlices, discovery.EndpointSlice{
- Ports: []discovery.EndpointPort{{
- Name: utilpointer.StringPtr(""),
- Protocol: &protoTCP,
- Port: utilpointer.Int32Ptr(int32(8080 + i)),
- }},
- AddressType: discovery.AddressTypeIPv4,
- })
- }
- // slices fetched should match expected address type and ports
- expectUnorderedSlicesWithTopLevelAttrs(t, fetchedSlices, expectedSlices)
- }
- // This test ensures that maxEndpointsPerSlice configuration results in
- // appropriate endpoints distribution among slices
- func TestReconcileMaxEndpointsPerSlice(t *testing.T) {
- namespace := "test"
- svc, _ := newServiceAndEndpointMeta("foo", namespace)
- // start with 250 pods
- pods := []*corev1.Pod{}
- for i := 0; i < 250; i++ {
- ready := !(i%3 == 0)
- pods = append(pods, newPod(i, namespace, ready, 1))
- }
- testCases := []struct {
- maxEndpointsPerSlice int32
- expectedSliceLengths []int
- expectedMetricValues expectedMetrics
- }{
- {
- maxEndpointsPerSlice: int32(50),
- expectedSliceLengths: []int{50, 50, 50, 50, 50},
- expectedMetricValues: expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 250, addedPerSync: 250, numCreated: 5},
- }, {
- maxEndpointsPerSlice: int32(80),
- expectedSliceLengths: []int{80, 80, 80, 10},
- expectedMetricValues: expectedMetrics{desiredSlices: 4, actualSlices: 4, desiredEndpoints: 250, addedPerSync: 250, numCreated: 4},
- }, {
- maxEndpointsPerSlice: int32(150),
- expectedSliceLengths: []int{150, 100},
- expectedMetricValues: expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 250, addedPerSync: 250, numCreated: 2},
- }, {
- maxEndpointsPerSlice: int32(250),
- expectedSliceLengths: []int{250},
- expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1},
- }, {
- maxEndpointsPerSlice: int32(500),
- expectedSliceLengths: []int{250},
- expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1},
- },
- }
- for _, testCase := range testCases {
- t.Run(fmt.Sprintf("maxEndpointsPerSlice: %d", testCase.maxEndpointsPerSlice), func(t *testing.T) {
- client := newClientset()
- setupMetrics()
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, testCase.maxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
- expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), testCase.expectedSliceLengths)
- expectMetrics(t, testCase.expectedMetricValues)
- })
- }
- }
- func TestReconcileEndpointSlicesMetrics(t *testing.T) {
- client := newClientset()
- setupMetrics()
- namespace := "test"
- svc, _ := newServiceAndEndpointMeta("foo", namespace)
- // start with 20 pods
- pods := []*corev1.Pod{}
- for i := 0; i < 20; i++ {
- pods = append(pods, newPod(i, namespace, true, 1))
- }
- r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
- reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
- actions := client.Actions()
- assert.Equal(t, 1, len(actions), "Expected 1 additional client actions as part of reconcile")
- assert.True(t, actions[0].Matches("create", "endpointslices"), "First action should be create endpoint slice")
- expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 20, addedPerSync: 20, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
- fetchedSlices := fetchEndpointSlices(t, client, namespace)
- reconcileHelper(t, r, &svc, pods[0:10], []*discovery.EndpointSlice{&fetchedSlices[0]}, time.Now())
- expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 10, addedPerSync: 20, removedPerSync: 10, numCreated: 1, numUpdated: 1, numDeleted: 0})
- }
- // Test Helpers
- func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *reconciler {
- informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
- nodeInformer := informerFactory.Core().V1().Nodes()
- indexer := nodeInformer.Informer().GetIndexer()
- for _, node := range nodes {
- indexer.Add(node)
- }
- return &reconciler{
- client: client,
- nodeLister: corelisters.NewNodeLister(indexer),
- maxEndpointsPerSlice: maxEndpointsPerSlice,
- endpointSliceTracker: newEndpointSliceTracker(),
- metricsCache: metrics.NewCache(maxEndpointsPerSlice),
- }
- }
- // ensures endpoint slices exist with the desired set of lengths
- func expectUnorderedSlicesWithLengths(t *testing.T, endpointSlices []discovery.EndpointSlice, expectedLengths []int) {
- assert.Len(t, endpointSlices, len(expectedLengths), "Expected %d endpoint slices", len(expectedLengths))
- lengthsWithNoMatch := []int{}
- desiredLengths := expectedLengths
- actualLengths := []int{}
- for _, endpointSlice := range endpointSlices {
- actualLen := len(endpointSlice.Endpoints)
- actualLengths = append(actualLengths, actualLen)
- matchFound := false
- for i := 0; i < len(desiredLengths); i++ {
- if desiredLengths[i] == actualLen {
- matchFound = true
- desiredLengths = append(desiredLengths[:i], desiredLengths[i+1:]...)
- break
- }
- }
- if !matchFound {
- lengthsWithNoMatch = append(lengthsWithNoMatch, actualLen)
- }
- }
- if len(lengthsWithNoMatch) > 0 || len(desiredLengths) > 0 {
- t.Errorf("Actual slice lengths (%v) don't match expected (%v)", actualLengths, expectedLengths)
- }
- }
- // ensures endpoint slices exist with the desired set of ports and address types
- func expectUnorderedSlicesWithTopLevelAttrs(t *testing.T, endpointSlices []discovery.EndpointSlice, expectedSlices []discovery.EndpointSlice) {
- t.Helper()
- assert.Len(t, endpointSlices, len(expectedSlices), "Expected %d endpoint slices", len(expectedSlices))
- slicesWithNoMatch := []discovery.EndpointSlice{}
- for _, endpointSlice := range endpointSlices {
- matchFound := false
- for i := 0; i < len(expectedSlices); i++ {
- if portsAndAddressTypeEqual(expectedSlices[i], endpointSlice) {
- matchFound = true
- expectedSlices = append(expectedSlices[:i], expectedSlices[i+1:]...)
- break
- }
- }
- if !matchFound {
- slicesWithNoMatch = append(slicesWithNoMatch, endpointSlice)
- }
- }
- assert.Len(t, slicesWithNoMatch, 0, "EndpointSlice(s) found without matching attributes")
- assert.Len(t, expectedSlices, 0, "Expected slices(s) not found in EndpointSlices")
- }
- func expectActions(t *testing.T, actions []k8stesting.Action, num int, verb, resource string) {
- t.Helper()
- for i := 0; i < num; i++ {
- relativePos := len(actions) - i - 1
- assert.Equal(t, verb, actions[relativePos].GetVerb(), "Expected action -%d verb to be %s", i, verb)
- assert.Equal(t, resource, actions[relativePos].GetResource().Resource, "Expected action -%d resource to be %s", i, resource)
- }
- }
- func expectTrackedResourceVersion(t *testing.T, tracker *endpointSliceTracker, slice *discovery.EndpointSlice, expectedRV string) {
- rrv := tracker.relatedResourceVersions(slice)
- rv, tracked := rrv[slice.Name]
- if !tracked {
- t.Fatalf("Expected EndpointSlice %s to be tracked", slice.Name)
- }
- if rv != expectedRV {
- t.Errorf("Expected ResourceVersion of %s to be %s, got %s", slice.Name, expectedRV, rv)
- }
- }
- func portsAndAddressTypeEqual(slice1, slice2 discovery.EndpointSlice) bool {
- return apiequality.Semantic.DeepEqual(slice1.Ports, slice2.Ports) && apiequality.Semantic.DeepEqual(slice1.AddressType, slice2.AddressType)
- }
- func createEndpointSlices(t *testing.T, client *fake.Clientset, namespace string, endpointSlices []*discovery.EndpointSlice) {
- t.Helper()
- for _, endpointSlice := range endpointSlices {
- _, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
- if err != nil {
- t.Fatalf("Expected no error creating Endpoint Slice, got: %v", err)
- }
- }
- }
- func fetchEndpointSlices(t *testing.T, client *fake.Clientset, namespace string) []discovery.EndpointSlice {
- t.Helper()
- fetchedSlices, err := client.DiscoveryV1beta1().EndpointSlices(namespace).List(context.TODO(), metav1.ListOptions{})
- if err != nil {
- t.Fatalf("Expected no error fetching Endpoint Slices, got: %v", err)
- return []discovery.EndpointSlice{}
- }
- return fetchedSlices.Items
- }
- func reconcileHelper(t *testing.T, r *reconciler, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) {
- t.Helper()
- err := r.reconcile(service, pods, existingSlices, triggerTime)
- if err != nil {
- t.Fatalf("Expected no error reconciling Endpoint Slices, got: %v", err)
- }
- }
- // Metrics helpers
- type expectedMetrics struct {
- desiredSlices int
- actualSlices int
- desiredEndpoints int
- addedPerSync int
- removedPerSync int
- numCreated int
- numUpdated int
- numDeleted int
- }
- func expectMetrics(t *testing.T, em expectedMetrics) {
- t.Helper()
- actualDesiredSlices := getGaugeMetricValue(t, metrics.DesiredEndpointSlices.WithLabelValues())
- if actualDesiredSlices != float64(em.desiredSlices) {
- t.Errorf("Expected desiredEndpointSlices to be %d, got %v", em.desiredSlices, actualDesiredSlices)
- }
- actualNumSlices := getGaugeMetricValue(t, metrics.NumEndpointSlices.WithLabelValues())
- if actualDesiredSlices != float64(em.desiredSlices) {
- t.Errorf("Expected numEndpointSlices to be %d, got %v", em.actualSlices, actualNumSlices)
- }
- actualEndpointsDesired := getGaugeMetricValue(t, metrics.EndpointsDesired.WithLabelValues())
- if actualEndpointsDesired != float64(em.desiredEndpoints) {
- t.Errorf("Expected desiredEndpoints to be %d, got %v", em.desiredEndpoints, actualEndpointsDesired)
- }
- actualAddedPerSync := getHistogramMetricValue(t, metrics.EndpointsAddedPerSync.WithLabelValues())
- if actualAddedPerSync != float64(em.addedPerSync) {
- t.Errorf("Expected endpointsAddedPerSync to be %d, got %v", em.addedPerSync, actualAddedPerSync)
- }
- actualRemovedPerSync := getHistogramMetricValue(t, metrics.EndpointsRemovedPerSync.WithLabelValues())
- if actualRemovedPerSync != float64(em.removedPerSync) {
- t.Errorf("Expected endpointsRemovedPerSync to be %d, got %v", em.removedPerSync, actualRemovedPerSync)
- }
- actualCreated := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("create"))
- if actualCreated != float64(em.numCreated) {
- t.Errorf("Expected endpointSliceChangesCreated to be %d, got %v", em.numCreated, actualCreated)
- }
- actualUpdated := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("update"))
- if actualUpdated != float64(em.numUpdated) {
- t.Errorf("Expected endpointSliceChangesUpdated to be %d, got %v", em.numUpdated, actualUpdated)
- }
- actualDeleted := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("delete"))
- if actualDeleted != float64(em.numDeleted) {
- t.Errorf("Expected endpointSliceChangesDeleted to be %d, got %v", em.numDeleted, actualDeleted)
- }
- }
- func setupMetrics() {
- metrics.RegisterMetrics()
- metrics.NumEndpointSlices.Delete(map[string]string{})
- metrics.DesiredEndpointSlices.Delete(map[string]string{})
- metrics.EndpointsDesired.Delete(map[string]string{})
- metrics.EndpointsAddedPerSync.Delete(map[string]string{})
- metrics.EndpointsRemovedPerSync.Delete(map[string]string{})
- metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"})
- metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"})
- metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"})
- }
- func getGaugeMetricValue(t *testing.T, metric compmetrics.GaugeMetric) float64 {
- t.Helper()
- metricProto := &dto.Metric{}
- if err := metric.Write(metricProto); err != nil {
- t.Errorf("Error writing metric: %v", err)
- }
- return metricProto.Gauge.GetValue()
- }
- func getCounterMetricValue(t *testing.T, metric compmetrics.CounterMetric) float64 {
- t.Helper()
- metricProto := &dto.Metric{}
- if err := metric.(compmetrics.Metric).Write(metricProto); err != nil {
- t.Errorf("Error writing metric: %v", err)
- }
- return metricProto.Counter.GetValue()
- }
- func getHistogramMetricValue(t *testing.T, metric compmetrics.ObserverMetric) float64 {
- t.Helper()
- metricProto := &dto.Metric{}
- if err := metric.(compmetrics.Metric).Write(metricProto); err != nil {
- t.Errorf("Error writing metric: %v", err)
- }
- return metricProto.Histogram.GetSampleSum()
- }
|