1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954 |
- package endpoint
- import (
- "fmt"
- "net/http"
- "net/http/httptest"
- "reflect"
- "strconv"
- "testing"
- "time"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/util/diff"
- "k8s.io/apimachinery/pkg/util/intstr"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- "k8s.io/client-go/informers"
- clientset "k8s.io/client-go/kubernetes"
- clientscheme "k8s.io/client-go/kubernetes/scheme"
- restclient "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/cache"
- utiltesting "k8s.io/client-go/util/testing"
- featuregatetesting "k8s.io/component-base/featuregate/testing"
- endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
- api "k8s.io/kubernetes/pkg/apis/core"
- "k8s.io/kubernetes/pkg/controller"
- endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
- "k8s.io/kubernetes/pkg/features"
- )
- var alwaysReady = func() bool { return true }
- var neverReady = func() bool { return false }
- var emptyNodeName string
- var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
- var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
- var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
- func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack bool) *v1.Pod {
- p := &v1.Pod{
- TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: fmt.Sprintf("pod%d", id),
- Labels: map[string]string{"foo": "bar"},
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
- },
- Status: v1.PodStatus{
- PodIP: fmt.Sprintf("1.2.3.%d", 4+id),
- Conditions: []v1.PodCondition{
- {
- Type: v1.PodReady,
- Status: v1.ConditionTrue,
- },
- },
- },
- }
- if !isReady {
- p.Status.Conditions[0].Status = v1.ConditionFalse
- }
- for j := 0; j < nPorts; j++ {
- p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
- v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
- }
- if makeDualstack {
- p.Status.PodIPs = []v1.PodIP{
- {
- IP: p.Status.PodIP,
- },
- {
- IP: fmt.Sprintf("2000::%d", id),
- },
- }
- }
- return p
- }
- func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, makeDualstack bool) {
- for i := 0; i < nPods+nNotReady; i++ {
- isReady := i < nPods
- pod := testPod(namespace, i, nPorts, isReady, makeDualstack)
- store.Add(pod)
- }
- }
- func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) {
- for i := 0; i < nPods; i++ {
- p := &v1.Pod{
- TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: fmt.Sprintf("pod%d", i),
- Labels: map[string]string{"foo": "bar"},
- },
- Spec: v1.PodSpec{
- RestartPolicy: restartPolicy,
- Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
- },
- Status: v1.PodStatus{
- PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
- Phase: podPhase,
- Conditions: []v1.PodCondition{
- {
- Type: v1.PodReady,
- Status: v1.ConditionFalse,
- },
- },
- },
- }
- for j := 0; j < nPorts; j++ {
- p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
- v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
- }
- store.Add(p)
- }
- }
- func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) {
- fakeEndpointsHandler := utiltesting.FakeHandler{
- StatusCode: http.StatusOK,
- ResponseBody: runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{}),
- }
- mux := http.NewServeMux()
- if namespace == "" {
- t.Fatal("namespace cannot be empty")
- }
- mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints", &fakeEndpointsHandler)
- mux.Handle("/api/v1/namespaces/"+namespace+"/endpoints/", &fakeEndpointsHandler)
- mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
- t.Errorf("unexpected request: %v", req.RequestURI)
- http.Error(res, "", http.StatusNotFound)
- })
- return httptest.NewServer(mux), &fakeEndpointsHandler
- }
- type endpointController struct {
- *EndpointController
- podStore cache.Store
- serviceStore cache.Store
- endpointsStore cache.Store
- }
- func newController(url string, batchPeriod time.Duration) *endpointController {
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
- informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
- endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
- informerFactory.Core().V1().Endpoints(), client, batchPeriod)
- endpoints.podsSynced = alwaysReady
- endpoints.servicesSynced = alwaysReady
- endpoints.endpointsSynced = alwaysReady
- return &endpointController{
- endpoints,
- informerFactory.Core().V1().Pods().Informer().GetStore(),
- informerFactory.Core().V1().Services().Informer().GetStore(),
- informerFactory.Core().V1().Endpoints().Informer().GetStore(),
- }
- }
- func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000}},
- }},
- })
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}},
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: nil,
- })
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- func TestSyncEndpointsNewNoSubsets(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- }
- func TestCheckLeftoverEndpoints(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, _ := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000}},
- }},
- })
- endpoints.checkLeftoverEndpoints()
- if e, a := 1, endpoints.queue.Len(); e != a {
- t.Fatalf("Expected %v, got %v", e, a)
- }
- got, _ := endpoints.queue.Get()
- if e, a := ns+"/foo", got; e != a {
- t.Errorf("Expected %v, got %v", e, a)
- }
- }
- func TestSyncEndpointsProtocolTCP(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsProtocolUDP(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "UDP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "UDP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsProtocolSCTP(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "SCTP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "SCTP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- addPods(endpoints.podStore, ns, 0, 1, 1, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{},
- })
- addPods(endpoints.podStore, ns, 1, 1, 1, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsItemsPreexisting(t *testing.T) {
- ns := "bar"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- ResourceVersion: "1",
- Name: "foo",
- Namespace: ns,
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- func TestSyncEndpointsItems(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- addPods(endpoints.podStore, ns, 3, 2, 0, false)
- addPods(endpoints.podStore, "blah", 5, 2, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{
- {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
- {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
- },
- },
- })
- endpoints.syncService("other/foo")
- expectedSubsets := []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{
- {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
- {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
- {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
- },
- Ports: []v1.EndpointPort{
- {Name: "port0", Port: 8080, Protocol: "TCP"},
- {Name: "port1", Port: 8088, Protocol: "TCP"},
- },
- }}
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- ResourceVersion: "",
- Name: "foo",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: endptspkg.SortSubsets(expectedSubsets),
- })
- endpointsHandler.ValidateRequestCount(t, 1)
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
- }
- func TestSyncEndpointsItemsWithLabels(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- addPods(endpoints.podStore, ns, 3, 2, 0, false)
- serviceLabels := map[string]string{"foo": "bar"}
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- Labels: serviceLabels,
- },
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{
- {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
- {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
- },
- },
- })
- endpoints.syncService(ns + "/foo")
- expectedSubsets := []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{
- {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
- {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
- {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
- },
- Ports: []v1.EndpointPort{
- {Name: "port0", Port: 8080, Protocol: "TCP"},
- {Name: "port1", Port: 8088, Protocol: "TCP"},
- },
- }}
- serviceLabels[v1.IsHeadlessService] = ""
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- ResourceVersion: "",
- Name: "foo",
- Labels: serviceLabels,
- },
- Subsets: endptspkg.SortSubsets(expectedSubsets),
- })
- endpointsHandler.ValidateRequestCount(t, 1)
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
- }
- func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
- ns := "bar"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- "foo": "bar",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- serviceLabels := map[string]string{"baz": "blah"}
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- Labels: serviceLabels,
- },
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- serviceLabels[v1.IsHeadlessService] = ""
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: serviceLabels,
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
- var tests = []struct {
- podsSynced func() bool
- servicesSynced func() bool
- endpointsSynced func() bool
- shouldUpdateEndpoints bool
- }{
- {neverReady, alwaysReady, alwaysReady, false},
- {alwaysReady, neverReady, alwaysReady, false},
- {alwaysReady, alwaysReady, neverReady, false},
- {alwaysReady, alwaysReady, alwaysReady, true},
- }
- for _, test := range tests {
- func() {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- service := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- }
- endpoints.serviceStore.Add(service)
- endpoints.onServiceUpdate(service)
- endpoints.podsSynced = test.podsSynced
- endpoints.servicesSynced = test.servicesSynced
- endpoints.endpointsSynced = test.endpointsSynced
- endpoints.workerLoopPeriod = 10 * time.Millisecond
- stopCh := make(chan struct{})
- defer close(stopCh)
- go endpoints.Run(1, stopCh)
-
-
-
- time.Sleep(150 * time.Millisecond)
- if test.shouldUpdateEndpoints {
-
- wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
- return endpoints.queue.Len() == 0, nil
- })
- endpointsHandler.ValidateRequestCount(t, 1)
- } else {
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- }()
- }
- }
- func TestSyncEndpointsHeadlessService(t *testing.T) {
- ns := "headless"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- service := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, Labels: map[string]string{"a": "b"}},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- ClusterIP: api.ClusterIPNone,
- Ports: []v1.ServicePort{},
- },
- }
- originalService := service.DeepCopy()
- endpoints.serviceStore.Add(service)
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- "a": "b",
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{},
- }},
- })
- if !reflect.DeepEqual(originalService, service) {
- t.Fatalf("syncing endpoints changed service: %s", diff.ObjectReflectDiff(service, originalService))
- }
- endpointsHandler.ValidateRequestCount(t, 1)
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- "foo": "bar",
- },
- },
- Subsets: []v1.EndpointSubset{},
- })
- addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- "foo": "bar",
- },
- },
- Subsets: []v1.EndpointSubset{},
- })
- addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- "foo": "bar",
- },
- },
- Subsets: []v1.EndpointSubset{},
- })
- addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- ClusterIP: "None",
- Ports: nil,
- },
- })
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: nil,
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
- }
- func TestShouldPodBeInEndpoints(t *testing.T) {
- testCases := []struct {
- name string
- pod *v1.Pod
- expected bool
- }{
-
- {
- name: "Failed pod with Never RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyNever,
- },
- Status: v1.PodStatus{
- Phase: v1.PodFailed,
- },
- },
- expected: false,
- },
- {
- name: "Succeeded pod with Never RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyNever,
- },
- Status: v1.PodStatus{
- Phase: v1.PodSucceeded,
- },
- },
- expected: false,
- },
- {
- name: "Succeeded pod with OnFailure RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyOnFailure,
- },
- Status: v1.PodStatus{
- Phase: v1.PodSucceeded,
- },
- },
- expected: false,
- },
-
- {
- name: "Failed pod with Always RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyAlways,
- },
- Status: v1.PodStatus{
- Phase: v1.PodFailed,
- },
- },
- expected: true,
- },
- {
- name: "Pending pod with Never RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyNever,
- },
- Status: v1.PodStatus{
- Phase: v1.PodPending,
- },
- },
- expected: true,
- },
- {
- name: "Unknown pod with OnFailure RestartPolicy",
- pod: &v1.Pod{
- Spec: v1.PodSpec{
- RestartPolicy: v1.RestartPolicyOnFailure,
- },
- Status: v1.PodStatus{
- Phase: v1.PodUnknown,
- },
- },
- expected: true,
- },
- }
- for _, test := range testCases {
- result := shouldPodBeInEndpoints(test.pod)
- if result != test.expected {
- t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result)
- }
- }
- }
- func TestPodToEndpointAddressForService(t *testing.T) {
- testCases := []struct {
- name string
- expectedEndPointIP string
- enableDualStack bool
- expectError bool
- enableDualStackPod bool
- service v1.Service
- }{
- {
- name: "v4 service, in a single stack cluster",
- expectedEndPointIP: "1.2.3.4",
- enableDualStack: false,
- expectError: false,
- enableDualStackPod: false,
- service: v1.Service{
- Spec: v1.ServiceSpec{
- ClusterIP: "10.0.0.1",
- },
- },
- },
- {
- name: "v4 service, in a dual stack cluster",
- expectedEndPointIP: "1.2.3.4",
- enableDualStack: true,
- expectError: false,
- enableDualStackPod: true,
- service: v1.Service{
- Spec: v1.ServiceSpec{
- ClusterIP: "10.0.0.1",
- },
- },
- },
- {
- name: "v6 service, in a dual stack cluster. dual stack enabled",
- expectedEndPointIP: "2000::0",
- enableDualStack: true,
- expectError: false,
- enableDualStackPod: true,
- service: v1.Service{
- Spec: v1.ServiceSpec{
- ClusterIP: "3000::1",
- },
- },
- },
-
-
-
- {
- name: "v6 service, in a v4 only cluster. dual stack disabled",
- expectedEndPointIP: "1.2.3.4",
- enableDualStack: false,
- expectError: false,
- enableDualStackPod: false,
- service: v1.Service{
- Spec: v1.ServiceSpec{
- ClusterIP: "3000::1",
- },
- },
- },
- {
- name: "v6 service, in a v4 only cluster - dual stack enabled",
- expectedEndPointIP: "1.2.3.4",
- enableDualStack: true,
- expectError: true,
- enableDualStackPod: false,
- service: v1.Service{
- Spec: v1.ServiceSpec{
- ClusterIP: "3000::1",
- },
- },
- },
- }
- for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
- podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
- ns := "test"
- addPods(podStore, ns, 1, 1, 0, tc.enableDualStackPod)
- pods := podStore.List()
- if len(pods) != 1 {
- t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods))
- }
- pod := pods[0].(*v1.Pod)
- epa, err := podToEndpointAddressForService(&tc.service, pod)
- if err != nil && !tc.expectError {
- t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err)
- }
- if err == nil && tc.expectError {
- t.Fatalf("podToEndpointAddressForService should have returned error but it did not")
- }
- if err != nil && tc.expectError {
- return
- }
- if epa.IP != tc.expectedEndPointIP {
- t.Fatalf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
- }
- if *(epa.NodeName) != pod.Spec.NodeName {
- t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
- }
- if epa.TargetRef.Kind != "Pod" {
- t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
- }
- if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
- t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
- }
- if epa.TargetRef.Name != pod.ObjectMeta.Name {
- t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
- }
- if epa.TargetRef.UID != pod.ObjectMeta.UID {
- t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
- }
- if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
- t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
- }
- })
- }
- }
- func TestPodToEndpointAddress(t *testing.T) {
- podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
- ns := "test"
- addPods(podStore, ns, 1, 1, 0, false)
- pods := podStore.List()
- if len(pods) != 1 {
- t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
- return
- }
- pod := pods[0].(*v1.Pod)
- epa := podToEndpointAddress(pod)
- if epa.IP != pod.Status.PodIP {
- t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
- }
- if *(epa.NodeName) != pod.Spec.NodeName {
- t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
- }
- if epa.TargetRef.Kind != "Pod" {
- t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
- }
- if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
- t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
- }
- if epa.TargetRef.Name != pod.ObjectMeta.Name {
- t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
- }
- if epa.TargetRef.UID != pod.ObjectMeta.UID {
- t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
- }
- if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
- t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
- }
- }
- func TestPodChanged(t *testing.T) {
- podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
- ns := "test"
- addPods(podStore, ns, 1, 1, 0, false)
- pods := podStore.List()
- if len(pods) != 1 {
- t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
- return
- }
- oldPod := pods[0].(*v1.Pod)
- newPod := oldPod.DeepCopy()
- if podChangedHelper(oldPod, newPod, endpointChanged) {
- t.Errorf("Expected pod to be unchanged for copied pod")
- }
- newPod.Spec.NodeName = "changed"
- if !podChangedHelper(oldPod, newPod, endpointChanged) {
- t.Errorf("Expected pod to be changed for pod with NodeName changed")
- }
- newPod.Spec.NodeName = oldPod.Spec.NodeName
- newPod.ObjectMeta.ResourceVersion = "changed"
- if podChangedHelper(oldPod, newPod, endpointChanged) {
- t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed")
- }
- newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
- newPod.Status.PodIP = "1.2.3.1"
- if !podChangedHelper(oldPod, newPod, endpointChanged) {
- t.Errorf("Expected pod to be changed with pod IP address change")
- }
- newPod.Status.PodIP = oldPod.Status.PodIP
-
-
-
- newPod.Status.PodIP = "1.1.3.1"
- newPod.Status.PodIPs = []v1.PodIP{
- {
- IP: "1.1.3.1",
- },
- {
- IP: "2000::1",
- },
- }
- if !podChangedHelper(oldPod, newPod, endpointChanged) {
- t.Errorf("Expected pod to be changed with adding secondary IP")
- }
-
- newPod.Status.PodIPs = nil
- newPod.Status.PodIP = oldPod.Status.PodIP
-
- saved := oldPod.Status.PodIP
- oldPod.Status.PodIP = "1.1.3.1"
- oldPod.Status.PodIPs = []v1.PodIP{
- {
- IP: "1.1.3.1",
- },
- {
- IP: "2000::1",
- },
- }
- newPod.Status.PodIP = "1.2.3.4"
- newPod.Status.PodIPs = []v1.PodIP{
- {
- IP: "1.2.3.4",
- },
- }
-
- oldPod.Status.PodIPs = nil
- newPod.Status.PodIPs = nil
- oldPod.Status.PodIP = saved
- newPod.Status.PodIP = saved
-
-
- saved = oldPod.Status.PodIP
- oldPod.Status.PodIP = "1.1.3.1"
- oldPod.Status.PodIPs = []v1.PodIP{
- {
- IP: "1.1.3.1",
- },
- {
- IP: "2000::1",
- },
- }
- newPod.Status.PodIP = "1.2.3.4"
- newPod.Status.PodIPs = []v1.PodIP{
- {
- IP: "1.2.3.4",
- },
- {
- IP: "2000::2",
- },
- }
-
- oldPod.Status.PodIPs = nil
- newPod.Status.PodIPs = nil
- oldPod.Status.PodIP = saved
- newPod.Status.PodIP = saved
-
- newPod.ObjectMeta.Name = "wrong-name"
- if !podChangedHelper(oldPod, newPod, endpointChanged) {
- t.Errorf("Expected pod to be changed with pod name change")
- }
- newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
- saveConditions := oldPod.Status.Conditions
- oldPod.Status.Conditions = nil
- if !podChangedHelper(oldPod, newPod, endpointChanged) {
- t.Errorf("Expected pod to be changed with pod readiness change")
- }
- oldPod.Status.Conditions = saveConditions
- now := metav1.NewTime(time.Now().UTC())
- newPod.ObjectMeta.DeletionTimestamp = &now
- if !podChangedHelper(oldPod, newPod, endpointChanged) {
- t.Errorf("Expected pod to be changed with DeletionTimestamp change")
- }
- newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
- }
- func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Annotations: map[string]string{
- v1.EndpointsLastChangeTriggerTime: triggerTimeString,
- },
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Annotations: map[string]string{
- v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- })
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Annotations: map[string]string{
- v1.EndpointsLastChangeTriggerTime: triggerTimeString,
- },
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0*time.Second)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Annotations: map[string]string{
- v1.EndpointsLastChangeTriggerTime: triggerTimeString,
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- })
-
- addPods(endpoints.podStore, ns, 1, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{},
- Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- v1.IsHeadlessService: "",
- },
- },
- Subsets: []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
- }
- func TestPodUpdatesBatching(t *testing.T) {
- type podUpdate struct {
- delay time.Duration
- podName string
- podIP string
- }
- tests := []struct {
- name string
- batchPeriod time.Duration
- podsCount int
- updates []podUpdate
- finalDelay time.Duration
- wantRequestCount int
- }{
- {
- name: "three updates with no batching",
- batchPeriod: 0 * time.Second,
- podsCount: 10,
- updates: []podUpdate{
- {
-
- delay: 200 * time.Millisecond,
- podName: "pod0",
- podIP: "10.0.0.0",
- },
- {
- delay: 100 * time.Millisecond,
- podName: "pod1",
- podIP: "10.0.0.1",
- },
- {
- delay: 100 * time.Millisecond,
- podName: "pod2",
- podIP: "10.0.0.2",
- },
- },
- finalDelay: 3 * time.Second,
- wantRequestCount: 3,
- },
- {
- name: "three updates in one batch",
- batchPeriod: 1 * time.Second,
- podsCount: 10,
- updates: []podUpdate{
- {
-
- delay: 200 * time.Millisecond,
- podName: "pod0",
- podIP: "10.0.0.0",
- },
- {
- delay: 100 * time.Millisecond,
- podName: "pod1",
- podIP: "10.0.0.1",
- },
- {
- delay: 100 * time.Millisecond,
- podName: "pod2",
- podIP: "10.0.0.2",
- },
- },
- finalDelay: 3 * time.Second,
- wantRequestCount: 1,
- },
- {
- name: "three updates in two batches",
- batchPeriod: 1 * time.Second,
- podsCount: 10,
- updates: []podUpdate{
- {
-
- delay: 200 * time.Millisecond,
- podName: "pod0",
- podIP: "10.0.0.0",
- },
- {
- delay: 100 * time.Millisecond,
- podName: "pod1",
- podIP: "10.0.0.1",
- },
- {
- delay: 1 * time.Second,
- podName: "pod2",
- podIP: "10.0.0.2",
- },
- },
- finalDelay: 3 * time.Second,
- wantRequestCount: 2,
- },
- }
- for _, tc := range tests {
- t.Run(tc.name, func(t *testing.T) {
- ns := "other"
- resourceVersion := 1
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, tc.batchPeriod)
- stopCh := make(chan struct{})
- defer close(stopCh)
- endpoints.podsSynced = alwaysReady
- endpoints.servicesSynced = alwaysReady
- endpoints.endpointsSynced = alwaysReady
- endpoints.workerLoopPeriod = 10 * time.Millisecond
- go endpoints.Run(1, stopCh)
- addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80}},
- },
- })
- for _, update := range tc.updates {
- time.Sleep(update.delay)
- old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
- if err != nil {
- t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
- }
- if !exists {
- t.Fatalf("Pod %q doesn't exist", update.podName)
- }
- oldPod := old.(*v1.Pod)
- newPod := oldPod.DeepCopy()
- newPod.Status.PodIP = update.podIP
- newPod.ResourceVersion = strconv.Itoa(resourceVersion)
- resourceVersion++
- endpoints.podStore.Update(newPod)
- endpoints.updatePod(oldPod, newPod)
- }
- time.Sleep(tc.finalDelay)
- endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
- })
- }
- }
- func TestPodAddsBatching(t *testing.T) {
- type podAdd struct {
- delay time.Duration
- }
- tests := []struct {
- name string
- batchPeriod time.Duration
- adds []podAdd
- finalDelay time.Duration
- wantRequestCount int
- }{
- {
- name: "three adds with no batching",
- batchPeriod: 0 * time.Second,
- adds: []podAdd{
- {
-
- delay: 200 * time.Millisecond,
- },
- {
- delay: 100 * time.Millisecond,
- },
- {
- delay: 100 * time.Millisecond,
- },
- },
- finalDelay: 3 * time.Second,
- wantRequestCount: 3,
- },
- {
- name: "three adds in one batch",
- batchPeriod: 1 * time.Second,
- adds: []podAdd{
- {
-
- delay: 200 * time.Millisecond,
- },
- {
- delay: 100 * time.Millisecond,
- },
- {
- delay: 100 * time.Millisecond,
- },
- },
- finalDelay: 3 * time.Second,
- wantRequestCount: 1,
- },
- {
- name: "three adds in two batches",
- batchPeriod: 1 * time.Second,
- adds: []podAdd{
- {
-
- delay: 200 * time.Millisecond,
- },
- {
- delay: 100 * time.Millisecond,
- },
- {
- delay: 1 * time.Second,
- },
- },
- finalDelay: 3 * time.Second,
- wantRequestCount: 2,
- },
- }
- for _, tc := range tests {
- t.Run(tc.name, func(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, tc.batchPeriod)
- stopCh := make(chan struct{})
- defer close(stopCh)
- endpoints.podsSynced = alwaysReady
- endpoints.servicesSynced = alwaysReady
- endpoints.endpointsSynced = alwaysReady
- endpoints.workerLoopPeriod = 10 * time.Millisecond
- go endpoints.Run(1, stopCh)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80}},
- },
- })
- for i, add := range tc.adds {
- time.Sleep(add.delay)
- p := testPod(ns, i, 1, true, false)
- endpoints.podStore.Add(p)
- endpoints.addPod(p)
- }
- time.Sleep(tc.finalDelay)
- endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
- })
- }
- }
- func TestPodDeleteBatching(t *testing.T) {
- type podDelete struct {
- delay time.Duration
- podName string
- }
- tests := []struct {
- name string
- batchPeriod time.Duration
- podsCount int
- deletes []podDelete
- finalDelay time.Duration
- wantRequestCount int
- }{
- {
- name: "three deletes with no batching",
- batchPeriod: 0 * time.Second,
- podsCount: 10,
- deletes: []podDelete{
- {
-
- delay: 200 * time.Millisecond,
- podName: "pod0",
- },
- {
- delay: 100 * time.Millisecond,
- podName: "pod1",
- },
- {
- delay: 100 * time.Millisecond,
- podName: "pod2",
- },
- },
- finalDelay: 3 * time.Second,
- wantRequestCount: 3,
- },
- {
- name: "three deletes in one batch",
- batchPeriod: 1 * time.Second,
- podsCount: 10,
- deletes: []podDelete{
- {
-
- delay: 200 * time.Millisecond,
- podName: "pod0",
- },
- {
- delay: 100 * time.Millisecond,
- podName: "pod1",
- },
- {
- delay: 100 * time.Millisecond,
- podName: "pod2",
- },
- },
- finalDelay: 3 * time.Second,
- wantRequestCount: 1,
- },
- {
- name: "three deletes in two batches",
- batchPeriod: 1 * time.Second,
- podsCount: 10,
- deletes: []podDelete{
- {
-
- delay: 200 * time.Millisecond,
- podName: "pod0",
- },
- {
- delay: 100 * time.Millisecond,
- podName: "pod1",
- },
- {
- delay: 1 * time.Second,
- podName: "pod2",
- },
- },
- finalDelay: 3 * time.Second,
- wantRequestCount: 2,
- },
- }
- for _, tc := range tests {
- t.Run(tc.name, func(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, tc.batchPeriod)
- stopCh := make(chan struct{})
- defer close(stopCh)
- endpoints.podsSynced = alwaysReady
- endpoints.servicesSynced = alwaysReady
- endpoints.endpointsSynced = alwaysReady
- endpoints.workerLoopPeriod = 10 * time.Millisecond
- go endpoints.Run(1, stopCh)
- addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
- endpoints.serviceStore.Add(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: v1.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Port: 80}},
- },
- })
- for _, update := range tc.deletes {
- time.Sleep(update.delay)
- old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
- if err != nil {
- t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
- }
- if !exists {
- t.Fatalf("Pod %q doesn't exist", update.podName)
- }
- endpoints.podStore.Delete(old)
- endpoints.deletePod(old)
- }
- time.Sleep(tc.finalDelay)
- endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
- })
- }
- }
- func TestSyncEndpointsServiceNotFound(t *testing.T) {
- ns := metav1.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns)
- defer testServer.Close()
- endpoints := newController(testServer.URL, 0)
- endpoints.endpointsStore.Add(&v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 1)
- endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil)
- }
- func podChangedHelper(oldPod, newPod *v1.Pod, endpointChanged endpointutil.EndpointsMatch) bool {
- podChanged, _ := endpointutil.PodChanged(oldPod, newPod, endpointChanged)
- return podChanged
- }
|