1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package endpoint
- import (
- "fmt"
- "net/http"
- "net/http/httptest"
- "testing"
- "time"
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/util/intstr"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/informers"
- clientset "k8s.io/client-go/kubernetes"
- restclient "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/cache"
- utiltesting "k8s.io/client-go/util/testing"
- "k8s.io/kubernetes/pkg/api/testapi"
- endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/pkg/controller"
- )
- var alwaysReady = func() bool { return true }
- var neverReady = func() bool { return false }
- var emptyNodeName string
- var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
- var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
- var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
- func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
- for i := 0; i < nPods+nNotReady; i++ {
- p := &v1.Pod{
- TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: fmt.Sprintf("pod%d", i),
- Labels: map[string]string{"foo": "bar"},
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
- },
- Status: v1.PodStatus{
- PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
- Conditions: []v1.PodCondition{
- {
- Type: v1.PodReady,
- Status: v1.ConditionTrue,
- },
- },
- },
- }
- if i >= nPods {
- p.Status.Conditions[0].Status = v1.ConditionFalse
- }
- for j := 0; j < nPorts; j++ {
- p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
- v1.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)})
- }
- store.Add(p)
- }
- }
- func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) {
- for i := 0; i < nPods; i++ {
- p := &v1.Pod{
- TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: fmt.Sprintf("pod%d", i),
- Labels: map[string]string{"foo": "bar"},
- },
- Spec: v1.PodSpec{
- RestartPolicy: restartPolicy,
- Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
- },
- Status: v1.PodStatus{
- PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
- Phase: podPhase,
- Conditions: []v1.PodCondition{
- {
- Type: v1.PodReady,
- Status: v1.ConditionFalse,
- },
- },
- },
- }
- for j := 0; j < nPorts; j++ {
- p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
- v1.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)})
- }
- store.Add(p)
- }
- }
- func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) {
- fakeEndpointsHandler := utiltesting.FakeHandler{
- StatusCode: http.StatusOK,
- ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{}),
- }
- mux := http.NewServeMux()
- mux.Handle(testapi.Default.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler)
- mux.Handle(testapi.Default.ResourcePath("endpoints/", namespace, ""), &fakeEndpointsHandler)
- mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
- t.Errorf("unexpected request: %v", req.RequestURI)
- http.Error(res, "", http.StatusNotFound)
- })
- return httptest.NewServer(mux), &fakeEndpointsHandler
- }
- type endpointController struct {
- *EndpointController
- podStore cache.Store
- serviceStore cache.Store
- endpointsStore cache.Store
- }
- func newController(url string) *endpointController {
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
- endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
- informerFactory.Core().V1().Endpoints(), client)
- endpoints.podsSynced = alwaysReady
- endpoints.servicesSynced = alwaysReady
- endpoints.endpointsSynced = alwaysReady
- return &endpointController{
- endpoints,
- informerFactory.Core().V1().Pods().Informer().GetStore(),
- informerFactory.Core().V1().Services().Informer().GetStore(),
- informerFactory.Core().V1().Endpoints().Informer().GetStore(),
- }
- }
- func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000}},
- }},
- })
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}},
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: nil,
- })
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- func TestSyncEndpointsNewNoSubsets(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- }
- func TestCheckLeftoverEndpoints(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, _ := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000}},
- }},
- })
- endpoints.checkLeftoverEndpoints()
- if e, a := 1, endpoints.queue.Len(); e != a {
- t.Fatalf("Expected %v, got %v", e, a)
- }
- got, _ := endpoints.queue.Get()
- if e, a := ns+"/foo", got; e != a {
- t.Errorf("Expected %v, got %v", e, a)
- }
- }
- func TestSyncEndpointsProtocolTCP(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsProtocolUDP(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "UDP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "UDP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsProtocolSCTP(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "SCTP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "SCTP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- addPods(endpoints.podStore, ns, 0, 1, 1)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- addPods(endpoints.podStore, ns, 1, 1, 1)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsPreexisting(t *testing.T) {
- ns := "bar"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- ResourceVersion: "1",
- Name: "foo",
- Namespace: ns,
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- func TestSyncEndpointsItems(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- addPods(endpoints.podStore, ns, 3, 2, 0)
- addPods(endpoints.podStore, "blah", 5, 2, 0) // make sure these aren't found!
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{
- {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
- {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
- },
- },
- })
- endpoints.syncService("other/foo")
- expectedSubsets := []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{
- {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
- {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
- {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
- },
- Ports: []v1.EndpointPort{
- {Name: "port0", Port: 8080, Protocol: "TCP"},
- {Name: "port1", Port: 8088, Protocol: "TCP"},
- },
- }}
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- ResourceVersion: "",
- Name: "foo",
- },
- Subsets: endptspkg.SortSubsets(expectedSubsets),
- })
- endpointsHandler.ValidateRequestCount(t, 1)
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
- }
- func TestSyncEndpointsItemsWithLabels(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- addPods(endpoints.podStore, ns, 3, 2, 0)
- serviceLabels := map[string]string{"foo": "bar"}
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- Labels: serviceLabels,
- },
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{
- {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
- {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
- },
- },
- })
- endpoints.syncService(ns + "/foo")
- expectedSubsets := []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{
- {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
- {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
- {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
- },
- Ports: []v1.EndpointPort{
- {Name: "port0", Port: 8080, Protocol: "TCP"},
- {Name: "port1", Port: 8088, Protocol: "TCP"},
- },
- }}
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- ResourceVersion: "",
- Name: "foo",
- Labels: serviceLabels,
- },
- Subsets: endptspkg.SortSubsets(expectedSubsets),
- })
- endpointsHandler.ValidateRequestCount(t, 1)
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
- }
- func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
- ns := "bar"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- "foo": "bar",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0)
- serviceLabels := map[string]string{"baz": "blah"}
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- Labels: serviceLabels,
- },
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: serviceLabels,
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
- var tests = []struct {
- podsSynced func() bool
- servicesSynced func() bool
- endpointsSynced func() bool
- shouldUpdateEndpoints bool
- }{
- {neverReady, alwaysReady, alwaysReady, false},
- {alwaysReady, neverReady, alwaysReady, false},
- {alwaysReady, alwaysReady, neverReady, false},
- {alwaysReady, alwaysReady, alwaysReady, true},
- }
- for _, test := range tests {
- func() {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- addPods(endpoints.podStore, ns, 1, 1, 0)
- service := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- }
- endpoints.serviceStore.Add(service)
- endpoints.enqueueService(service)
- endpoints.podsSynced = test.podsSynced
- endpoints.servicesSynced = test.servicesSynced
- endpoints.endpointsSynced = test.endpointsSynced
- endpoints.workerLoopPeriod = 10 * time.Millisecond
- stopCh := make(chan struct{})
- defer close(stopCh)
- go endpoints.Run(1, stopCh)
- // cache.WaitForCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period.
- // To ensure we get all updates, including unexpected ones, we need to wait at least as long as
- // a single cache sync period and worker period, with some fudge room.
- time.Sleep(150 * time.Millisecond)
- if test.shouldUpdateEndpoints {
- // Ensure the work queue has been processed by looping for up to a second to prevent flakes.
- wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
- return endpoints.queue.Len() == 0, nil
- })
- endpointsHandler.ValidateRequestCount(t, 1)
- } else {
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- }()
- }
- }
- func TestSyncEndpointsHeadlessService(t *testing.T) {
- ns := "headless"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- ClusterIP: api.ClusterIPNone,
- Ports: []v1.ServicePort{},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{},
- }},
- })
- endpointsHandler.ValidateRequestCount(t, 1)
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- "foo": "bar",
- },
- },
- Subsets: []v1.EndpointSubset{},
- })
- addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- "foo": "bar",
- },
- },
- Subsets: []v1.EndpointSubset{},
- })
- addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- "foo": "bar",
- },
- },
- Subsets: []v1.EndpointSubset{},
- })
- addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- ClusterIP: "None",
- Ports: nil,
- },
- })
- addPods(endpoints.podStore, ns, 1, 1, 0)
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: nil,
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
- }
- // There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here.
- // Just list all of the 3 false cases and 3 of the 12 true cases.
- func TestShouldPodBeInEndpoints(t *testing.T) {
- testCases := []struct {
- name string
- pod *v1.Pod
- expected bool
- }{
- // Pod should not be in endpoints cases:
- {
- name: "Failed pod with Never RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyNever,
- },
- Status: v1.PodStatus{
- Phase: v1.PodFailed,
- },
- },
- expected: false,
- },
- {
- name: "Succeeded pod with Never RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyNever,
- },
- Status: v1.PodStatus{
- Phase: v1.PodSucceeded,
- },
- },
- expected: false,
- },
- {
- name: "Succeeded pod with OnFailure RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyOnFailure,
- },
- Status: v1.PodStatus{
- Phase: v1.PodSucceeded,
- },
- },
- expected: false,
- },
- // Pod should be in endpoints cases:
- {
- name: "Failed pod with Always RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyAlways,
- },
- Status: v1.PodStatus{
- Phase: v1.PodFailed,
- },
- },
- expected: true,
- },
- {
- name: "Pending pod with Never RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyNever,
- },
- Status: v1.PodStatus{
- Phase: v1.PodPending,
- },
- },
- expected: true,
- },
- {
- name: "Unknown pod with OnFailure RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyOnFailure,
- },
- Status: v1.PodStatus{
- Phase: v1.PodUnknown,
- },
- },
- expected: true,
- },
- }
- for _, test := range testCases {
- result := shouldPodBeInEndpoints(test.pod)
- if result != test.expected {
- t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result)
- }
- }
- }
- func TestPodToEndpointAddress(t *testing.T) {
- podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
- ns := "test"
- addPods(podStore, ns, 1, 1, 0)
- pods := podStore.List()
- if len(pods) != 1 {
- t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
- return
- }
- pod := pods[0].(*v1.Pod)
- epa := podToEndpointAddress(pod)
- if epa.IP != pod.Status.PodIP {
- t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
- }
- if *(epa.NodeName) != pod.Spec.NodeName {
- t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
- }
- if epa.TargetRef.Kind != "Pod" {
- t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
- }
- if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
- t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
- }
- if epa.TargetRef.Name != pod.ObjectMeta.Name {
- t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
- }
- if epa.TargetRef.UID != pod.ObjectMeta.UID {
- t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
- }
- if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
- t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
- }
- }
- func TestPodChanged(t *testing.T) {
- podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
- ns := "test"
- addPods(podStore, ns, 1, 1, 0)
- pods := podStore.List()
- if len(pods) != 1 {
- t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
- return
- }
- oldPod := pods[0].(*v1.Pod)
- newPod := oldPod.DeepCopy()
- if podChanged(oldPod, newPod) {
- t.Errorf("Expected pod to be unchanged for copied pod")
- }
- newPod.Spec.NodeName = "changed"
- if !podChanged(oldPod, newPod) {
- t.Errorf("Expected pod to be changed for pod with NodeName changed")
- }
- newPod.Spec.NodeName = oldPod.Spec.NodeName
- newPod.ObjectMeta.ResourceVersion = "changed"
- if podChanged(oldPod, newPod) {
- t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed")
- }
- newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
- newPod.Status.PodIP = "1.2.3.1"
- if !podChanged(oldPod, newPod) {
- t.Errorf("Expected pod to be changed with pod IP address change")
- }
- newPod.Status.PodIP = oldPod.Status.PodIP
- newPod.ObjectMeta.Name = "wrong-name"
- if !podChanged(oldPod, newPod) {
- t.Errorf("Expected pod to be changed with pod name change")
- }
- newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
- saveConditions := oldPod.Status.Conditions
- oldPod.Status.Conditions = nil
- if !podChanged(oldPod, newPod) {
- t.Errorf("Expected pod to be changed with pod readiness change")
- }
- oldPod.Status.Conditions = saveConditions
- now := metav1.NewTime(time.Now().UTC())
- newPod.ObjectMeta.DeletionTimestamp = &now
- if !podChanged(oldPod, newPod) {
- t.Errorf("Expected pod to be changed with DeletionTimestamp change")
- }
- newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
- }
- func TestDetermineNeededServiceUpdates(t *testing.T) {
- testCases := []struct {
- name string
- a sets.String
- b sets.String
- union sets.String
- xor sets.String
- }{
- {
- name: "no services changed",
- a: sets.NewString("a", "b", "c"),
- b: sets.NewString("a", "b", "c"),
- xor: sets.NewString(),
- union: sets.NewString("a", "b", "c"),
- },
- {
- name: "all old services removed, new services added",
- a: sets.NewString("a", "b", "c"),
- b: sets.NewString("d", "e", "f"),
- xor: sets.NewString("a", "b", "c", "d", "e", "f"),
- union: sets.NewString("a", "b", "c", "d", "e", "f"),
- },
- {
- name: "all old services removed, no new services added",
- a: sets.NewString("a", "b", "c"),
- b: sets.NewString(),
- xor: sets.NewString("a", "b", "c"),
- union: sets.NewString("a", "b", "c"),
- },
- {
- name: "no old services, but new services added",
- a: sets.NewString(),
- b: sets.NewString("a", "b", "c"),
- xor: sets.NewString("a", "b", "c"),
- union: sets.NewString("a", "b", "c"),
- },
- {
- name: "one service removed, one service added, two unchanged",
- a: sets.NewString("a", "b", "c"),
- b: sets.NewString("b", "c", "d"),
- xor: sets.NewString("a", "d"),
- union: sets.NewString("a", "b", "c", "d"),
- },
- {
- name: "no services",
- a: sets.NewString(),
- b: sets.NewString(),
- xor: sets.NewString(),
- union: sets.NewString(),
- },
- }
- for _, testCase := range testCases {
- retval := determineNeededServiceUpdates(testCase.a, testCase.b, false)
- if !retval.Equal(testCase.xor) {
- t.Errorf("%s (with podChanged=false): expected: %v got: %v", testCase.name, testCase.xor.List(), retval.List())
- }
- retval = determineNeededServiceUpdates(testCase.a, testCase.b, true)
- if !retval.Equal(testCase.union) {
- t.Errorf("%s (with podChanged=true): expected: %v got: %v", testCase.name, testCase.union.List(), retval.List())
- }
- }
- }
- func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Annotations: map[string]string{
- v1.EndpointsLastChangeTriggerTime: triggerTimeString,
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Annotations: map[string]string{
- v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Annotations: map[string]string{
- v1.EndpointsLastChangeTriggerTime: triggerTimeString,
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Annotations: map[string]string{
- v1.EndpointsLastChangeTriggerTime: triggerTimeString,
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- })
- // Neither pod nor service has trigger time, this should cause annotation to be cleared.
- addPods(endpoints.podStore, ns, 1, 1, 0)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Annotations: map[string]string{}, // Annotation not set anymore.
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
|