123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package iptables
- import (
- "bytes"
- "fmt"
- "net"
- "reflect"
- "strconv"
- "strings"
- "testing"
- "time"
- "k8s.io/klog"
- "github.com/stretchr/testify/assert"
- v1 "k8s.io/api/core/v1"
- discovery "k8s.io/api/discovery/v1beta1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/intstr"
- "k8s.io/kubernetes/pkg/proxy"
- "k8s.io/kubernetes/pkg/proxy/healthcheck"
- utilproxy "k8s.io/kubernetes/pkg/proxy/util"
- proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
- utilproxytest "k8s.io/kubernetes/pkg/proxy/util/testing"
- "k8s.io/kubernetes/pkg/util/async"
- "k8s.io/kubernetes/pkg/util/conntrack"
- utiliptables "k8s.io/kubernetes/pkg/util/iptables"
- iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
- "k8s.io/utils/exec"
- fakeexec "k8s.io/utils/exec/testing"
- utilpointer "k8s.io/utils/pointer"
- )
- func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
- chainLines := utiliptables.GetChainLines(table, save)
- for chain, lineBytes := range chainLines {
- line := string(lineBytes)
- if expected, exists := expectedLines[chain]; exists {
- if expected != line {
- t.Errorf("getChainLines expected chain line not present. For chain: %s Expected: %s Got: %s", chain, expected, line)
- }
- } else {
- t.Errorf("getChainLines expected chain not present: %s", chain)
- }
- }
- }
- func TestGetChainLines(t *testing.T) {
- iptablesSave := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014
- *nat
- :PREROUTING ACCEPT [2136997:197881818]
- :POSTROUTING ACCEPT [4284525:258542680]
- :OUTPUT ACCEPT [5901660:357267963]
- -A PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
- COMMIT
- # Completed on Wed Oct 29 14:56:01 2014`
- expected := map[utiliptables.Chain]string{
- utiliptables.ChainPrerouting: ":PREROUTING ACCEPT [2136997:197881818]",
- utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [4284525:258542680]",
- utiliptables.ChainOutput: ":OUTPUT ACCEPT [5901660:357267963]",
- }
- checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
- }
- func TestGetChainLinesMultipleTables(t *testing.T) {
- iptablesSave := `# Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
- *nat
- :PREROUTING ACCEPT [2:138]
- :INPUT ACCEPT [0:0]
- :OUTPUT ACCEPT [0:0]
- :POSTROUTING ACCEPT [0:0]
- :DOCKER - [0:0]
- :KUBE-NODEPORT-CONTAINER - [0:0]
- :KUBE-NODEPORT-HOST - [0:0]
- :KUBE-PORTALS-CONTAINER - [0:0]
- :KUBE-PORTALS-HOST - [0:0]
- :KUBE-SVC-1111111111111111 - [0:0]
- :KUBE-SVC-2222222222222222 - [0:0]
- :KUBE-SVC-3333333333333333 - [0:0]
- :KUBE-SVC-4444444444444444 - [0:0]
- :KUBE-SVC-5555555555555555 - [0:0]
- :KUBE-SVC-6666666666666666 - [0:0]
- -A PREROUTING -m comment --comment "handle ClusterIPs; NOTE: this must be before the NodePort rules" -j KUBE-PORTALS-CONTAINER
- -A PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
- -A PREROUTING -m addrtype --dst-type LOCAL -m comment --comment "handle service NodePorts; NOTE: this must be the last rule in the chain" -j KUBE-NODEPORT-CONTAINER
- -A OUTPUT -m comment --comment "handle ClusterIPs; NOTE: this must be before the NodePort rules" -j KUBE-PORTALS-HOST
- -A OUTPUT ! -d 127.0.0.0/8 -m addrtype --dst-type LOCAL -j DOCKER
- -A OUTPUT -m addrtype --dst-type LOCAL -m comment --comment "handle service NodePorts; NOTE: this must be the last rule in the chain" -j KUBE-NODEPORT-HOST
- -A POSTROUTING -s 10.246.1.0/24 ! -o cbr0 -j MASQUERADE
- -A POSTROUTING -s 10.0.2.15/32 -d 10.0.2.15/32 -m comment --comment "handle pod connecting to self" -j MASQUERADE
- -A KUBE-PORTALS-CONTAINER -d 10.247.0.1/32 -p tcp -m comment --comment "portal for default/kubernetes:" -m state --state NEW -m tcp --dport 443 -j KUBE-SVC-5555555555555555
- -A KUBE-PORTALS-CONTAINER -d 10.247.0.10/32 -p udp -m comment --comment "portal for kube-system/kube-dns:dns" -m state --state NEW -m udp --dport 53 -j KUBE-SVC-6666666666666666
- -A KUBE-PORTALS-CONTAINER -d 10.247.0.10/32 -p tcp -m comment --comment "portal for kube-system/kube-dns:dns-tcp" -m state --state NEW -m tcp --dport 53 -j KUBE-SVC-2222222222222222
- -A KUBE-PORTALS-HOST -d 10.247.0.1/32 -p tcp -m comment --comment "portal for default/kubernetes:" -m state --state NEW -m tcp --dport 443 -j KUBE-SVC-5555555555555555
- -A KUBE-PORTALS-HOST -d 10.247.0.10/32 -p udp -m comment --comment "portal for kube-system/kube-dns:dns" -m state --state NEW -m udp --dport 53 -j KUBE-SVC-6666666666666666
- -A KUBE-PORTALS-HOST -d 10.247.0.10/32 -p tcp -m comment --comment "portal for kube-system/kube-dns:dns-tcp" -m state --state NEW -m tcp --dport 53 -j KUBE-SVC-2222222222222222
- -A KUBE-SVC-1111111111111111 -p udp -m comment --comment "kube-system/kube-dns:dns" -m recent --set --name KUBE-SVC-1111111111111111 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.246.1.2:53
- -A KUBE-SVC-2222222222222222 -m comment --comment "kube-system/kube-dns:dns-tcp" -j KUBE-SVC-3333333333333333
- -A KUBE-SVC-3333333333333333 -p tcp -m comment --comment "kube-system/kube-dns:dns-tcp" -m recent --set --name KUBE-SVC-3333333333333333 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.246.1.2:53
- -A KUBE-SVC-4444444444444444 -p tcp -m comment --comment "default/kubernetes:" -m recent --set --name KUBE-SVC-4444444444444444 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.245.1.2:443
- -A KUBE-SVC-5555555555555555 -m comment --comment "default/kubernetes:" -j KUBE-SVC-4444444444444444
- -A KUBE-SVC-6666666666666666 -m comment --comment "kube-system/kube-dns:dns" -j KUBE-SVC-1111111111111111
- COMMIT
- # Completed on Fri Aug 7 14:47:37 2015
- # Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
- *filter
- :INPUT ACCEPT [17514:83115836]
- :FORWARD ACCEPT [0:0]
- :OUTPUT ACCEPT [8909:688225]
- :DOCKER - [0:0]
- -A FORWARD -o cbr0 -j DOCKER
- -A FORWARD -o cbr0 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
- -A FORWARD -i cbr0 ! -o cbr0 -j ACCEPT
- -A FORWARD -i cbr0 -o cbr0 -j ACCEPT
- COMMIT
- `
- expected := map[utiliptables.Chain]string{
- utiliptables.ChainPrerouting: ":PREROUTING ACCEPT [2:138]",
- utiliptables.Chain("INPUT"): ":INPUT ACCEPT [0:0]",
- utiliptables.Chain("OUTPUT"): ":OUTPUT ACCEPT [0:0]",
- utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [0:0]",
- utiliptables.Chain("DOCKER"): ":DOCKER - [0:0]",
- utiliptables.Chain("KUBE-NODEPORT-CONTAINER"): ":KUBE-NODEPORT-CONTAINER - [0:0]",
- utiliptables.Chain("KUBE-NODEPORT-HOST"): ":KUBE-NODEPORT-HOST - [0:0]",
- utiliptables.Chain("KUBE-PORTALS-CONTAINER"): ":KUBE-PORTALS-CONTAINER - [0:0]",
- utiliptables.Chain("KUBE-PORTALS-HOST"): ":KUBE-PORTALS-HOST - [0:0]",
- utiliptables.Chain("KUBE-SVC-1111111111111111"): ":KUBE-SVC-1111111111111111 - [0:0]",
- utiliptables.Chain("KUBE-SVC-2222222222222222"): ":KUBE-SVC-2222222222222222 - [0:0]",
- utiliptables.Chain("KUBE-SVC-3333333333333333"): ":KUBE-SVC-3333333333333333 - [0:0]",
- utiliptables.Chain("KUBE-SVC-4444444444444444"): ":KUBE-SVC-4444444444444444 - [0:0]",
- utiliptables.Chain("KUBE-SVC-5555555555555555"): ":KUBE-SVC-5555555555555555 - [0:0]",
- utiliptables.Chain("KUBE-SVC-6666666666666666"): ":KUBE-SVC-6666666666666666 - [0:0]",
- }
- checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
- }
- func TestDeleteEndpointConnections(t *testing.T) {
- const (
- UDP = v1.ProtocolUDP
- TCP = v1.ProtocolTCP
- SCTP = v1.ProtocolSCTP
- )
- testCases := []struct {
- description string
- svcName string
- svcIP string
- svcPort int32
- protocol v1.Protocol
- endpoint string // IP:port endpoint
- epSvcPair proxy.ServiceEndpoint // Will be generated by test
- simulatedErr string
- }{
- {
- description: "V4 UDP",
- svcName: "v4-udp",
- svcIP: "10.96.1.1",
- svcPort: 80,
- protocol: UDP,
- endpoint: "10.240.0.3:80",
- }, {
- description: "V4 TCP",
- svcName: "v4-tcp",
- svcIP: "10.96.2.2",
- svcPort: 80,
- protocol: TCP,
- endpoint: "10.240.0.4:80",
- }, {
- description: "V4 SCTP",
- svcName: "v4-sctp",
- svcIP: "10.96.3.3",
- svcPort: 80,
- protocol: SCTP,
- endpoint: "10.240.0.5:80",
- }, {
- description: "V4 UDP, nothing to delete, benign error",
- svcName: "v4-udp-nothing-to-delete",
- svcIP: "10.96.1.1",
- svcPort: 80,
- protocol: UDP,
- endpoint: "10.240.0.3:80",
- simulatedErr: conntrack.NoConnectionToDelete,
- }, {
- description: "V4 UDP, unexpected error, should be glogged",
- svcName: "v4-udp-simulated-error",
- svcIP: "10.96.1.1",
- svcPort: 80,
- protocol: UDP,
- endpoint: "10.240.0.3:80",
- simulatedErr: "simulated error",
- }, {
- description: "V6 UDP",
- svcName: "v6-udp",
- svcIP: "fd00:1234::20",
- svcPort: 80,
- protocol: UDP,
- endpoint: "[2001:db8::2]:80",
- }, {
- description: "V6 TCP",
- svcName: "v6-tcp",
- svcIP: "fd00:1234::30",
- svcPort: 80,
- protocol: TCP,
- endpoint: "[2001:db8::3]:80",
- }, {
- description: "V6 SCTP",
- svcName: "v6-sctp",
- svcIP: "fd00:1234::40",
- svcPort: 80,
- protocol: SCTP,
- endpoint: "[2001:db8::4]:80",
- },
- }
- // Create a fake executor for the conntrack utility. This should only be
- // invoked for UDP connections, since no conntrack cleanup is needed for TCP
- fcmd := fakeexec.FakeCmd{}
- fexec := fakeexec.FakeExec{
- LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
- }
- execFunc := func(cmd string, args ...string) exec.Cmd {
- return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
- }
- for _, tc := range testCases {
- if tc.protocol == UDP {
- var cmdOutput string
- var simErr error
- if tc.simulatedErr == "" {
- cmdOutput = "1 flow entries have been deleted"
- } else {
- simErr = fmt.Errorf(tc.simulatedErr)
- }
- cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr }
- fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
- fexec.CommandScript = append(fexec.CommandScript, execFunc)
- }
- }
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- fp.exec = &fexec
- for _, tc := range testCases {
- makeServiceMap(fp,
- makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
- svc.Spec.ClusterIP = tc.svcIP
- svc.Spec.Ports = []v1.ServicePort{{
- Name: "p80",
- Port: tc.svcPort,
- Protocol: tc.protocol,
- }}
- svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
- }),
- )
- proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
- }
- // Run the test cases
- for _, tc := range testCases {
- priorExecs := fexec.CommandCalls
- priorGlogErrs := klog.Stats.Error.Lines()
- svc := proxy.ServicePortName{
- NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
- Port: "p80",
- Protocol: tc.protocol,
- }
- input := []proxy.ServiceEndpoint{
- {
- Endpoint: tc.endpoint,
- ServicePortName: svc,
- },
- }
- fp.deleteEndpointConnections(input)
- // For UDP connections, check the executed conntrack command
- var expExecs int
- if tc.protocol == UDP {
- isIPv6 := func(ip string) bool {
- netIP := net.ParseIP(ip)
- return netIP.To4() == nil
- }
- endpointIP := utilproxy.IPPart(tc.endpoint)
- expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
- if isIPv6(endpointIP) {
- expectCommand += " -f ipv6"
- }
- actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ")
- if actualCommand != expectCommand {
- t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand)
- }
- expExecs = 1
- }
- // Check the number of times conntrack was executed
- execs := fexec.CommandCalls - priorExecs
- if execs != expExecs {
- t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs)
- }
- // Check the number of new glog errors
- var expGlogErrs int64
- if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
- expGlogErrs = 1
- }
- glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
- if glogErrs != expGlogErrs {
- t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs)
- }
- }
- }
- // fakePortOpener implements portOpener.
- type fakePortOpener struct {
- openPorts []*utilproxy.LocalPort
- }
- // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
- // to lock a local port.
- func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
- f.openPorts = append(f.openPorts, lp)
- return nil, nil
- }
- const testHostname = "test-hostname"
- func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Proxier {
- // TODO: Call NewProxier after refactoring out the goroutine
- // invocation into a Run() method.
- detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/24", ipt)
- p := &Proxier{
- exec: &fakeexec.FakeExec{},
- serviceMap: make(proxy.ServiceMap),
- serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
- endpointsMap: make(proxy.EndpointsMap),
- endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled),
- iptables: ipt,
- localDetector: detectLocal,
- hostname: testHostname,
- portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
- portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
- serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
- precomputedProbabilities: make([]string, 0, 1001),
- iptablesData: bytes.NewBuffer(nil),
- existingFilterChainsData: bytes.NewBuffer(nil),
- filterChains: bytes.NewBuffer(nil),
- filterRules: bytes.NewBuffer(nil),
- natChains: bytes.NewBuffer(nil),
- natRules: bytes.NewBuffer(nil),
- nodePortAddresses: make([]string, 0),
- networkInterfacer: utilproxytest.NewFakeNetwork(),
- }
- p.setInitialized(true)
- p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
- return p
- }
- func hasSessionAffinityRule(rules []iptablestest.Rule) bool {
- for _, r := range rules {
- if _, ok := r[iptablestest.Recent]; ok {
- return true
- }
- }
- return false
- }
- func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool {
- destPortStr := strconv.Itoa(destPort)
- match := false
- for _, r := range rules {
- if r[iptablestest.Jump] == destChain {
- match = true
- if destIP != "" {
- if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPortStr) || r[iptablestest.DPort] == "") {
- return true
- }
- match = false
- }
- if destPort != 0 {
- if strings.Contains(r[iptablestest.DPort], destPortStr) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") {
- return true
- }
- match = false
- }
- }
- }
- return match
- }
- func hasSrcType(rules []iptablestest.Rule, srcType string) bool {
- for _, r := range rules {
- if r[iptablestest.SrcType] != srcType {
- continue
- }
- return true
- }
- return false
- }
- func hasMasqRandomFully(rules []iptablestest.Rule) bool {
- for _, r := range rules {
- if r[iptablestest.Masquerade] == "--random-fully" {
- return true
- }
- }
- return false
- }
- func TestHasJump(t *testing.T) {
- testCases := map[string]struct {
- rules []iptablestest.Rule
- destChain string
- destIP string
- destPort int
- expected bool
- }{
- "case 1": {
- // Match the 1st rule(both dest IP and dest Port)
- rules: []iptablestest.Rule{
- {"-d ": "10.20.30.41/32", "--dport ": "80", "-p ": "tcp", "-j ": "REJECT"},
- {"--dport ": "3001", "-p ": "tcp", "-j ": "KUBE-MARK-MASQ"},
- },
- destChain: "REJECT",
- destIP: "10.20.30.41",
- destPort: 80,
- expected: true,
- },
- "case 2": {
- // Match the 2nd rule(dest Port)
- rules: []iptablestest.Rule{
- {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
- {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
- },
- destChain: "REJECT",
- destIP: "",
- destPort: 3001,
- expected: true,
- },
- "case 3": {
- // Match both dest IP and dest Port
- rules: []iptablestest.Rule{
- {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
- },
- destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
- destIP: "1.2.3.4",
- destPort: 80,
- expected: true,
- },
- "case 4": {
- // Match dest IP but doesn't match dest Port
- rules: []iptablestest.Rule{
- {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
- },
- destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
- destIP: "1.2.3.4",
- destPort: 8080,
- expected: false,
- },
- "case 5": {
- // Match dest Port but doesn't match dest IP
- rules: []iptablestest.Rule{
- {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
- },
- destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
- destIP: "10.20.30.40",
- destPort: 80,
- expected: false,
- },
- "case 6": {
- // Match the 2nd rule(dest IP)
- rules: []iptablestest.Rule{
- {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
- {"-d ": "1.2.3.4/32", "-p ": "tcp", "-j ": "REJECT"},
- {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
- },
- destChain: "REJECT",
- destIP: "1.2.3.4",
- destPort: 8080,
- expected: true,
- },
- "case 7": {
- // Match the 2nd rule(dest Port)
- rules: []iptablestest.Rule{
- {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
- {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
- },
- destChain: "REJECT",
- destIP: "1.2.3.4",
- destPort: 3001,
- expected: true,
- },
- "case 8": {
- // Match the 1st rule(dest IP)
- rules: []iptablestest.Rule{
- {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
- {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
- },
- destChain: "REJECT",
- destIP: "10.20.30.41",
- destPort: 8080,
- expected: true,
- },
- "case 9": {
- rules: []iptablestest.Rule{
- {"-j ": "KUBE-SEP-LWSOSDSHMKPJHHJV"},
- },
- destChain: "KUBE-SEP-LWSOSDSHMKPJHHJV",
- destIP: "",
- destPort: 0,
- expected: true,
- },
- "case 10": {
- rules: []iptablestest.Rule{
- {"-j ": "KUBE-SEP-FOO"},
- },
- destChain: "KUBE-SEP-BAR",
- destIP: "",
- destPort: 0,
- expected: false,
- },
- }
- for k, tc := range testCases {
- if got := hasJump(tc.rules, tc.destChain, tc.destIP, tc.destPort); got != tc.expected {
- t.Errorf("%v: expected %v, got %v", k, tc.expected, got)
- }
- }
- }
- func hasDNAT(rules []iptablestest.Rule, endpoint string) bool {
- for _, r := range rules {
- if r[iptablestest.ToDest] == endpoint {
- return true
- }
- }
- return false
- }
- func errorf(msg string, rules []iptablestest.Rule, t *testing.T) {
- for _, r := range rules {
- t.Logf("%q", r)
- }
- t.Errorf("%v", msg)
- }
- func TestClusterIPReject(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- svcIP := "10.20.30.41"
- svcPort := 80
- svcPortName := proxy.ServicePortName{
- NamespacedName: makeNSN("ns1", "svc1"),
- Port: "p80",
- }
- makeServiceMap(fp,
- makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
- svc.Spec.ClusterIP = svcIP
- svc.Spec.Ports = []v1.ServicePort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- }}
- }),
- )
- makeEndpointsMap(fp)
- fp.syncProxyRules()
- svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP))))
- svcRules := ipt.GetRules(svcChain)
- if len(svcRules) != 0 {
- errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcPortName), svcRules, t)
- }
- kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
- if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcPort) {
- errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
- }
- }
- func TestClusterIPEndpointsJump(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- svcIP := "10.20.30.41"
- svcPort := 80
- svcPortName := proxy.ServicePortName{
- NamespacedName: makeNSN("ns1", "svc1"),
- Port: "p80",
- Protocol: v1.ProtocolTCP,
- }
- makeServiceMap(fp,
- makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
- svc.Spec.ClusterIP = svcIP
- svc.Spec.Ports = []v1.ServicePort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- }}
- }),
- )
- epIP := "10.180.0.1"
- makeEndpointsMap(fp,
- makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: epIP,
- }},
- Ports: []v1.EndpointPort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- }},
- }}
- }),
- )
- fp.syncProxyRules()
- epStr := fmt.Sprintf("%s:%d", epIP, svcPort)
- svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP))))
- epChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStr))
- kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
- if !hasJump(kubeSvcRules, svcChain, svcIP, svcPort) {
- errorf(fmt.Sprintf("Failed to find jump from KUBE-SERVICES to %v chain", svcChain), kubeSvcRules, t)
- }
- svcRules := ipt.GetRules(svcChain)
- if !hasJump(svcRules, epChain, "", 0) {
- errorf(fmt.Sprintf("Failed to jump to ep chain %v", epChain), svcRules, t)
- }
- epRules := ipt.GetRules(epChain)
- if !hasDNAT(epRules, epStr) {
- errorf(fmt.Sprintf("Endpoint chain %v lacks DNAT to %v", epChain, epStr), epRules, t)
- }
- }
- func TestLoadBalancer(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- svcIP := "10.20.30.41"
- svcPort := 80
- svcNodePort := 3001
- svcLBIP := "1.2.3.4"
- svcPortName := proxy.ServicePortName{
- NamespacedName: makeNSN("ns1", "svc1"),
- Port: "p80",
- Protocol: v1.ProtocolTCP,
- }
- makeServiceMap(fp,
- makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
- svc.Spec.Type = "LoadBalancer"
- svc.Spec.ClusterIP = svcIP
- svc.Spec.Ports = []v1.ServicePort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- NodePort: int32(svcNodePort),
- }}
- svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
- IP: svcLBIP,
- }}
- }),
- )
- epIP := "10.180.0.1"
- makeEndpointsMap(fp,
- makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: epIP,
- }},
- Ports: []v1.EndpointPort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- }},
- }}
- }),
- )
- fp.syncProxyRules()
- proto := strings.ToLower(string(v1.ProtocolTCP))
- fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
- svcChain := string(servicePortChainName(svcPortName.String(), proto))
- kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
- if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
- errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
- }
- fwRules := ipt.GetRules(fwChain)
- if !hasJump(fwRules, svcChain, "", 0) || !hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
- errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, svcChain), fwRules, t)
- }
- }
- func TestNodePort(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- svcIP := "10.20.30.41"
- svcPort := 80
- svcNodePort := 3001
- svcPortName := proxy.ServicePortName{
- NamespacedName: makeNSN("ns1", "svc1"),
- Port: "p80",
- Protocol: v1.ProtocolTCP,
- }
- makeServiceMap(fp,
- makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
- svc.Spec.Type = "NodePort"
- svc.Spec.ClusterIP = svcIP
- svc.Spec.Ports = []v1.ServicePort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- NodePort: int32(svcNodePort),
- }}
- }),
- )
- epIP := "10.180.0.1"
- makeEndpointsMap(fp,
- makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: epIP,
- }},
- Ports: []v1.EndpointPort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- }},
- }}
- }),
- )
- itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
- addrs := []net.Addr{utilproxytest.AddrStruct{Val: "127.0.0.1/16"}}
- itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
- addrs1 := []net.Addr{utilproxytest.AddrStruct{Val: "::1/128"}}
- fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
- fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
- fp.nodePortAddresses = []string{}
- fp.syncProxyRules()
- proto := strings.ToLower(string(v1.ProtocolTCP))
- svcChain := string(servicePortChainName(svcPortName.String(), proto))
- kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
- if !hasJump(kubeNodePortRules, svcChain, "", svcNodePort) {
- errorf(fmt.Sprintf("Failed to find jump to svc chain %v", svcChain), kubeNodePortRules, t)
- }
- }
- func TestMasqueradeRule(t *testing.T) {
- for _, testcase := range []bool{false, true} {
- ipt := iptablestest.NewFake().SetHasRandomFully(testcase)
- fp := NewFakeProxier(ipt, false)
- makeServiceMap(fp)
- makeEndpointsMap(fp)
- fp.syncProxyRules()
- postRoutingRules := ipt.GetRules(string(kubePostroutingChain))
- if !hasJump(postRoutingRules, "MASQUERADE", "", 0) {
- errorf(fmt.Sprintf("Failed to find -j MASQUERADE in %s chain", kubePostroutingChain), postRoutingRules, t)
- }
- if hasMasqRandomFully(postRoutingRules) != testcase {
- probs := map[bool]string{false: "found", true: "did not find"}
- errorf(fmt.Sprintf("%s --random-fully in -j MASQUERADE rule in %s chain when HasRandomFully()==%v", probs[testcase], kubePostroutingChain, testcase), postRoutingRules, t)
- }
- }
- }
- func TestExternalIPsReject(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- svcIP := "10.20.30.41"
- svcPort := 80
- svcExternalIPs := "50.60.70.81"
- svcPortName := proxy.ServicePortName{
- NamespacedName: makeNSN("ns1", "svc1"),
- Port: "p80",
- }
- makeServiceMap(fp,
- makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
- svc.Spec.Type = "ClusterIP"
- svc.Spec.ClusterIP = svcIP
- svc.Spec.ExternalIPs = []string{svcExternalIPs}
- svc.Spec.Ports = []v1.ServicePort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- TargetPort: intstr.FromInt(svcPort),
- }}
- }),
- )
- makeEndpointsMap(fp)
- fp.syncProxyRules()
- kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain))
- if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) {
- errorf(fmt.Sprintf("Failed to find a %v rule for externalIP %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
- }
- }
- func TestNodePortReject(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- svcIP := "10.20.30.41"
- svcPort := 80
- svcNodePort := 3001
- svcPortName := proxy.ServicePortName{
- NamespacedName: makeNSN("ns1", "svc1"),
- Port: "p80",
- }
- makeServiceMap(fp,
- makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
- svc.Spec.Type = "NodePort"
- svc.Spec.ClusterIP = svcIP
- svc.Spec.Ports = []v1.ServicePort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- NodePort: int32(svcNodePort),
- }}
- }),
- )
- makeEndpointsMap(fp)
- fp.syncProxyRules()
- kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain))
- if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) {
- errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
- }
- }
- func TestOnlyLocalLoadBalancing(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- svcIP := "10.20.30.41"
- svcPort := 80
- svcNodePort := 3001
- svcLBIP := "1.2.3.4"
- svcPortName := proxy.ServicePortName{
- NamespacedName: makeNSN("ns1", "svc1"),
- Port: "p80",
- Protocol: v1.ProtocolTCP,
- }
- svcSessionAffinityTimeout := int32(10800)
- makeServiceMap(fp,
- makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
- svc.Spec.Type = "LoadBalancer"
- svc.Spec.ClusterIP = svcIP
- svc.Spec.Ports = []v1.ServicePort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- NodePort: int32(svcNodePort),
- }}
- svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
- IP: svcLBIP,
- }}
- svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
- svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
- svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
- ClientIP: &v1.ClientIPConfig{TimeoutSeconds: &svcSessionAffinityTimeout},
- }
- }),
- )
- epIP1 := "10.180.0.1"
- epIP2 := "10.180.2.1"
- epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
- epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
- makeEndpointsMap(fp,
- makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: epIP1,
- NodeName: nil,
- }, {
- IP: epIP2,
- NodeName: utilpointer.StringPtr(testHostname),
- }},
- Ports: []v1.EndpointPort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- }},
- }}
- }),
- )
- fp.syncProxyRules()
- proto := strings.ToLower(string(v1.ProtocolTCP))
- fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
- lbChain := string(serviceLBChainName(svcPortName.String(), proto))
- nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrLocal))
- localEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrNonLocal))
- kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
- if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
- errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
- }
- fwRules := ipt.GetRules(fwChain)
- if !hasJump(fwRules, lbChain, "", 0) {
- errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, lbChain), fwRules, t)
- }
- if hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
- errorf(fmt.Sprintf("Found jump from fw chain %v to MASQUERADE", fwChain), fwRules, t)
- }
- lbRules := ipt.GetRules(lbChain)
- if hasJump(lbRules, nonLocalEpChain, "", 0) {
- errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
- }
- if !hasJump(lbRules, localEpChain, "", 0) {
- errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrNonLocal), lbRules, t)
- }
- if !hasSessionAffinityRule(lbRules) {
- errorf(fmt.Sprintf("Didn't find session affinity rule from lb chain %v", lbChain), lbRules, t)
- }
- }
- func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- onlyLocalNodePorts(t, fp, ipt)
- }
- func TestOnlyLocalNodePorts(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- onlyLocalNodePorts(t, fp, ipt)
- }
- func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTables) {
- svcIP := "10.20.30.41"
- svcPort := 80
- svcNodePort := 3001
- svcPortName := proxy.ServicePortName{
- NamespacedName: makeNSN("ns1", "svc1"),
- Port: "p80",
- Protocol: v1.ProtocolTCP,
- }
- makeServiceMap(fp,
- makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
- svc.Spec.Type = "NodePort"
- svc.Spec.ClusterIP = svcIP
- svc.Spec.Ports = []v1.ServicePort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- NodePort: int32(svcNodePort),
- }}
- svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
- }),
- )
- epIP1 := "10.180.0.1"
- epIP2 := "10.180.2.1"
- epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
- epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
- makeEndpointsMap(fp,
- makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: epIP1,
- NodeName: nil,
- }, {
- IP: epIP2,
- NodeName: utilpointer.StringPtr(testHostname),
- }},
- Ports: []v1.EndpointPort{{
- Name: svcPortName.Port,
- Port: int32(svcPort),
- Protocol: v1.ProtocolTCP,
- }},
- }}
- }),
- )
- itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0}
- addrs := []net.Addr{utilproxytest.AddrStruct{Val: "10.20.30.51/24"}}
- fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
- fp.nodePortAddresses = []string{"10.20.30.0/24"}
- fp.syncProxyRules()
- proto := strings.ToLower(string(v1.ProtocolTCP))
- lbChain := string(serviceLBChainName(svcPortName.String(), proto))
- nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), proto, epStrLocal))
- localEpChain := string(servicePortEndpointChainName(svcPortName.String(), proto, epStrNonLocal))
- kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
- if !hasJump(kubeNodePortRules, lbChain, "", svcNodePort) {
- errorf(fmt.Sprintf("Failed to find jump to lb chain %v", lbChain), kubeNodePortRules, t)
- }
- if !hasJump(kubeNodePortRules, string(KubeMarkMasqChain), "", svcNodePort) {
- errorf(fmt.Sprintf("Failed to find jump to %s chain for destination IP %d", KubeMarkMasqChain, svcNodePort), kubeNodePortRules, t)
- }
- kubeServiceRules := ipt.GetRules(string(kubeServicesChain))
- if !hasJump(kubeServiceRules, string(kubeNodePortsChain), "10.20.30.51", 0) {
- errorf(fmt.Sprintf("Failed to find jump to KUBE-NODEPORTS chain %v", string(kubeNodePortsChain)), kubeServiceRules, t)
- }
- svcChain := string(servicePortChainName(svcPortName.String(), proto))
- lbRules := ipt.GetRules(lbChain)
- if hasJump(lbRules, nonLocalEpChain, "", 0) {
- errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
- }
- if !hasJump(lbRules, svcChain, "", 0) || !hasSrcType(lbRules, "LOCAL") {
- errorf(fmt.Sprintf("Did not find jump from lb chain %v to svc %v with src-type LOCAL", lbChain, svcChain), lbRules, t)
- }
- if !hasJump(lbRules, localEpChain, "", 0) {
- errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrLocal), lbRules, t)
- }
- }
- func TestComputeProbability(t *testing.T) {
- expectedProbabilities := map[int]string{
- 1: "1.0000000000",
- 2: "0.5000000000",
- 10: "0.1000000000",
- 100: "0.0100000000",
- 1000: "0.0010000000",
- 10000: "0.0001000000",
- 100000: "0.0000100000",
- 100001: "0.0000099999",
- }
- for num, expected := range expectedProbabilities {
- actual := computeProbability(num)
- if actual != expected {
- t.Errorf("Expected computeProbability(%d) to be %s, got: %s", num, expected, actual)
- }
- }
- prevProbability := float64(0)
- for i := 100000; i > 1; i-- {
- currProbability, err := strconv.ParseFloat(computeProbability(i), 64)
- if err != nil {
- t.Fatalf("Error parsing float probability for %d: %v", i, err)
- }
- if currProbability <= prevProbability {
- t.Fatalf("Probability unexpectedly <= to previous probability for %d: (%0.10f <= %0.10f)", i, currProbability, prevProbability)
- }
- prevProbability = currProbability
- }
- }
- func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
- svc := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Namespace: namespace,
- Annotations: map[string]string{},
- },
- Spec: v1.ServiceSpec{},
- Status: v1.ServiceStatus{},
- }
- svcFunc(svc)
- return svc
- }
- func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port, nodeport int32, targetPort int) []v1.ServicePort {
- svcPort := v1.ServicePort{
- Name: name,
- Protocol: protocol,
- Port: port,
- NodePort: nodeport,
- TargetPort: intstr.FromInt(targetPort),
- }
- return append(array, svcPort)
- }
- func TestBuildServiceMapAddRemove(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- services := []*v1.Service{
- makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeClusterIP
- svc.Spec.ClusterIP = "172.16.55.4"
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpport", "SCTP", 1236, 6321, 0)
- }),
- makeTestService("somewhere-else", "node-port", func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeNodePort
- svc.Spec.ClusterIP = "172.16.55.10"
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0)
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0)
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "muchmoreblah", "SCTP", 343, 676, 0)
- }),
- makeTestService("somewhere", "load-balancer", func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeLoadBalancer
- svc.Spec.ClusterIP = "172.16.55.11"
- svc.Spec.LoadBalancerIP = "5.6.7.8"
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
- svc.Status.LoadBalancer = v1.LoadBalancerStatus{
- Ingress: []v1.LoadBalancerIngress{
- {IP: "10.1.2.4"},
- },
- }
- }),
- makeTestService("somewhere", "only-local-load-balancer", func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeLoadBalancer
- svc.Spec.ClusterIP = "172.16.55.12"
- svc.Spec.LoadBalancerIP = "5.6.7.8"
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
- svc.Status.LoadBalancer = v1.LoadBalancerStatus{
- Ingress: []v1.LoadBalancerIngress{
- {IP: "10.1.2.3"},
- },
- }
- svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
- svc.Spec.HealthCheckNodePort = 345
- }),
- }
- for i := range services {
- fp.OnServiceAdd(services[i])
- }
- result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
- if len(fp.serviceMap) != 10 {
- t.Errorf("expected service map length 10, got %v", fp.serviceMap)
- }
- // The only-local-loadbalancer ones get added
- if len(result.HCServiceNodePorts) != 1 {
- t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
- } else {
- nsn := makeNSN("somewhere", "only-local-load-balancer")
- if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
- t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
- }
- }
- if len(result.UDPStaleClusterIP) != 0 {
- // Services only added, so nothing stale yet
- t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
- }
- // Remove some stuff
- // oneService is a modification of services[0] with removed first port.
- oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeClusterIP
- svc.Spec.ClusterIP = "172.16.55.4"
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
- })
- fp.OnServiceUpdate(services[0], oneService)
- fp.OnServiceDelete(services[1])
- fp.OnServiceDelete(services[2])
- fp.OnServiceDelete(services[3])
- result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
- if len(fp.serviceMap) != 1 {
- t.Errorf("expected service map length 1, got %v", fp.serviceMap)
- }
- if len(result.HCServiceNodePorts) != 0 {
- t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
- }
- // All services but one were deleted. While you'd expect only the ClusterIPs
- // from the three deleted services here, we still have the ClusterIP for
- // the not-deleted service, because one of it's ServicePorts was deleted.
- expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
- if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
- t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList())
- }
- for _, ip := range expectedStaleUDPServices {
- if !result.UDPStaleClusterIP.Has(ip) {
- t.Errorf("expected stale UDP service service %s", ip)
- }
- }
- }
- func TestBuildServiceMapServiceHeadless(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- makeServiceMap(fp,
- makeTestService("somewhere-else", "headless", func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeClusterIP
- svc.Spec.ClusterIP = v1.ClusterIPNone
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
- }),
- makeTestService("somewhere-else", "headless-without-port", func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeClusterIP
- svc.Spec.ClusterIP = v1.ClusterIPNone
- }),
- )
- // Headless service should be ignored
- result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
- if len(fp.serviceMap) != 0 {
- t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
- }
- // No proxied services, so no healthchecks
- if len(result.HCServiceNodePorts) != 0 {
- t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
- }
- if len(result.UDPStaleClusterIP) != 0 {
- t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
- }
- }
- func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- makeServiceMap(fp,
- makeTestService("somewhere-else", "external-name", func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeExternalName
- svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
- svc.Spec.ExternalName = "foo2.bar.com"
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
- }),
- )
- result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
- if len(fp.serviceMap) != 0 {
- t.Errorf("expected service map length 0, got %v", fp.serviceMap)
- }
- // No proxied services, so no healthchecks
- if len(result.HCServiceNodePorts) != 0 {
- t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
- }
- if len(result.UDPStaleClusterIP) != 0 {
- t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
- }
- }
- func TestBuildServiceMapServiceUpdate(t *testing.T) {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeClusterIP
- svc.Spec.ClusterIP = "172.16.55.4"
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
- })
- servicev2 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
- svc.Spec.Type = v1.ServiceTypeLoadBalancer
- svc.Spec.ClusterIP = "172.16.55.4"
- svc.Spec.LoadBalancerIP = "5.6.7.8"
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
- svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
- svc.Status.LoadBalancer = v1.LoadBalancerStatus{
- Ingress: []v1.LoadBalancerIngress{
- {IP: "10.1.2.3"},
- },
- }
- svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
- svc.Spec.HealthCheckNodePort = 345
- })
- fp.OnServiceAdd(servicev1)
- result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
- if len(fp.serviceMap) != 2 {
- t.Errorf("expected service map length 2, got %v", fp.serviceMap)
- }
- if len(result.HCServiceNodePorts) != 0 {
- t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
- }
- if len(result.UDPStaleClusterIP) != 0 {
- // Services only added, so nothing stale yet
- t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
- }
- // Change service to load-balancer
- fp.OnServiceUpdate(servicev1, servicev2)
- result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
- if len(fp.serviceMap) != 2 {
- t.Errorf("expected service map length 2, got %v", fp.serviceMap)
- }
- if len(result.HCServiceNodePorts) != 1 {
- t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
- }
- if len(result.UDPStaleClusterIP) != 0 {
- t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
- }
- // No change; make sure the service map stays the same and there are
- // no health-check changes
- fp.OnServiceUpdate(servicev2, servicev2)
- result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
- if len(fp.serviceMap) != 2 {
- t.Errorf("expected service map length 2, got %v", fp.serviceMap)
- }
- if len(result.HCServiceNodePorts) != 1 {
- t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
- }
- if len(result.UDPStaleClusterIP) != 0 {
- t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
- }
- // And back to ClusterIP
- fp.OnServiceUpdate(servicev2, servicev1)
- result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
- if len(fp.serviceMap) != 2 {
- t.Errorf("expected service map length 2, got %v", fp.serviceMap)
- }
- if len(result.HCServiceNodePorts) != 0 {
- t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
- }
- if len(result.UDPStaleClusterIP) != 0 {
- // Services only added, so nothing stale yet
- t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
- }
- }
- func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
- ept := &v1.Endpoints{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Namespace: namespace,
- },
- }
- eptFunc(ept)
- return ept
- }
- func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
- for i := range allEndpoints {
- proxier.OnEndpointsAdd(allEndpoints[i])
- }
- proxier.mu.Lock()
- defer proxier.mu.Unlock()
- proxier.endpointsSynced = true
- }
- func makeNSN(namespace, name string) types.NamespacedName {
- return types.NamespacedName{Namespace: namespace, Name: name}
- }
- func makeServicePortName(ns, name, port string, protocol v1.Protocol) proxy.ServicePortName {
- return proxy.ServicePortName{
- NamespacedName: makeNSN(ns, name),
- Port: port,
- Protocol: protocol,
- }
- }
- func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
- for i := range allServices {
- proxier.OnServiceAdd(allServices[i])
- }
- proxier.mu.Lock()
- defer proxier.mu.Unlock()
- proxier.servicesSynced = true
- }
- func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
- if len(newMap) != len(expected) {
- t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
- }
- for x := range expected {
- if len(newMap[x]) != len(expected[x]) {
- t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
- } else {
- for i := range expected[x] {
- newEp, ok := newMap[x][i].(*endpointsInfo)
- if !ok {
- t.Errorf("Failed to cast endpointsInfo")
- continue
- }
- if newEp.Endpoint != expected[x][i].Endpoint ||
- newEp.IsLocal != expected[x][i].IsLocal ||
- newEp.protocol != expected[x][i].protocol ||
- newEp.chainName != expected[x][i].chainName {
- t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
- }
- }
- }
- }
- }
- func Test_updateEndpointsMap(t *testing.T) {
- var nodeName = testHostname
- emptyEndpoint := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{}
- }
- unnamedPort := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- }},
- Ports: []v1.EndpointPort{{
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- unnamedPortLocal := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- namedPortLocal := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11",
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- namedPort := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11",
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- namedPortRenamed := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11-2",
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- namedPortRenumbered := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11",
- Port: 22,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- namedPortsLocalNoLocal := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- }, {
- IP: "1.1.1.2",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11",
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }, {
- Name: "p12",
- Port: 12,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- multipleSubsets := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11",
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }},
- }, {
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.2",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p12",
- Port: 12,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- multipleSubsetsWithLocal := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11",
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }},
- }, {
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.2",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p12",
- Port: 12,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- multipleSubsetsMultiplePortsLocal := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11",
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }, {
- Name: "p12",
- Port: 12,
- Protocol: v1.ProtocolUDP,
- }},
- }, {
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.3",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p13",
- Port: 13,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- multipleSubsetsIPsPorts1 := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- }, {
- IP: "1.1.1.2",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11",
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }, {
- Name: "p12",
- Port: 12,
- Protocol: v1.ProtocolUDP,
- }},
- }, {
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.3",
- }, {
- IP: "1.1.1.4",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p13",
- Port: 13,
- Protocol: v1.ProtocolUDP,
- }, {
- Name: "p14",
- Port: 14,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- multipleSubsetsIPsPorts2 := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "2.2.2.1",
- }, {
- IP: "2.2.2.2",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p21",
- Port: 21,
- Protocol: v1.ProtocolUDP,
- }, {
- Name: "p22",
- Port: 22,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- complexBefore1 := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11",
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- complexBefore2 := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "2.2.2.2",
- NodeName: &nodeName,
- }, {
- IP: "2.2.2.22",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p22",
- Port: 22,
- Protocol: v1.ProtocolUDP,
- }},
- }, {
- Addresses: []v1.EndpointAddress{{
- IP: "2.2.2.3",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p23",
- Port: 23,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- complexBefore4 := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "4.4.4.4",
- NodeName: &nodeName,
- }, {
- IP: "4.4.4.5",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p44",
- Port: 44,
- Protocol: v1.ProtocolUDP,
- }},
- }, {
- Addresses: []v1.EndpointAddress{{
- IP: "4.4.4.6",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p45",
- Port: 45,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- complexAfter1 := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.1",
- }, {
- IP: "1.1.1.11",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p11",
- Port: 11,
- Protocol: v1.ProtocolUDP,
- }},
- }, {
- Addresses: []v1.EndpointAddress{{
- IP: "1.1.1.2",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p12",
- Port: 12,
- Protocol: v1.ProtocolUDP,
- }, {
- Name: "p122",
- Port: 122,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- complexAfter3 := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "3.3.3.3",
- }},
- Ports: []v1.EndpointPort{{
- Name: "p33",
- Port: 33,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- complexAfter4 := func(ept *v1.Endpoints) {
- ept.Subsets = []v1.EndpointSubset{{
- Addresses: []v1.EndpointAddress{{
- IP: "4.4.4.4",
- NodeName: &nodeName,
- }},
- Ports: []v1.EndpointPort{{
- Name: "p44",
- Port: 44,
- Protocol: v1.ProtocolUDP,
- }},
- }}
- }
- testCases := []struct {
- // previousEndpoints and currentEndpoints are used to call appropriate
- // handlers OnEndpoints* (based on whether corresponding values are nil
- // or non-nil) and must be of equal length.
- previousEndpoints []*v1.Endpoints
- currentEndpoints []*v1.Endpoints
- oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
- expectedResult map[proxy.ServicePortName][]*endpointsInfo
- expectedStaleEndpoints []proxy.ServiceEndpoint
- expectedStaleServiceNames map[proxy.ServicePortName]bool
- expectedHealthchecks map[types.NamespacedName]int
- }{{
- // Case[0]: nothing
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
- expectedStaleEndpoints: []proxy.ServiceEndpoint{},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
- expectedHealthchecks: map[types.NamespacedName]int{},
- }, {
- // Case[1]: no change, unnamed port
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", unnamedPort),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", unnamedPort),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
- expectedHealthchecks: map[types.NamespacedName]int{},
- }, {
- // Case[2]: no change, named port, local
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPortLocal),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPortLocal),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
- expectedHealthchecks: map[types.NamespacedName]int{
- makeNSN("ns1", "ep1"): 1,
- },
- }, {
- // Case[3]: no change, multiple subsets
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", multipleSubsets),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", multipleSubsets),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
- expectedHealthchecks: map[types.NamespacedName]int{},
- }, {
- // Case[4]: no change, multiple subsets, multiple ports, local
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
- expectedHealthchecks: map[types.NamespacedName]int{
- makeNSN("ns1", "ep1"): 1,
- },
- }, {
- // Case[5]: no change, multiple endpoints, subsets, IPs, and ports
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
- makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
- makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:13", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:14", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:14", IsLocal: true}},
- },
- makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:21", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:21", IsLocal: true}},
- },
- makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:22", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:13", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:14", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:14", IsLocal: true}},
- },
- makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:21", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:21", IsLocal: true}},
- },
- makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:22", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
- expectedHealthchecks: map[types.NamespacedName]int{
- makeNSN("ns1", "ep1"): 2,
- makeNSN("ns2", "ep2"): 1,
- },
- }, {
- // Case[6]: add an Endpoints
- previousEndpoints: []*v1.Endpoints{
- nil,
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{
- makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
- },
- expectedHealthchecks: map[types.NamespacedName]int{
- makeNSN("ns1", "ep1"): 1,
- },
- }, {
- // Case[7]: remove an Endpoints
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
- },
- currentEndpoints: []*v1.Endpoints{
- nil,
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
- expectedStaleEndpoints: []proxy.ServiceEndpoint{{
- Endpoint: "1.1.1.1:11",
- ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
- }},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
- expectedHealthchecks: map[types.NamespacedName]int{},
- }, {
- // Case[8]: add an IP and port
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPort),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
- },
- expectedHealthchecks: map[types.NamespacedName]int{
- makeNSN("ns1", "ep1"): 1,
- },
- }, {
- // Case[9]: remove an IP and port
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPort),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{{
- Endpoint: "1.1.1.2:11",
- ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
- }, {
- Endpoint: "1.1.1.1:12",
- ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
- }, {
- Endpoint: "1.1.1.2:12",
- ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
- }},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
- expectedHealthchecks: map[types.NamespacedName]int{},
- }, {
- // Case[10]: add a subset
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPort),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", multipleSubsetsWithLocal),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
- },
- expectedHealthchecks: map[types.NamespacedName]int{
- makeNSN("ns1", "ep1"): 1,
- },
- }, {
- // Case[11]: remove a subset
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", multipleSubsets),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPort),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{{
- Endpoint: "1.1.1.2:12",
- ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
- }},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
- expectedHealthchecks: map[types.NamespacedName]int{},
- }, {
- // Case[12]: rename a port
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPort),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPortRenamed),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{{
- Endpoint: "1.1.1.1:11",
- ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
- }},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{
- makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
- },
- expectedHealthchecks: map[types.NamespacedName]int{},
- }, {
- // Case[13]: renumber a port
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPort),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", namedPortRenumbered),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:22", IsLocal: false}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{{
- Endpoint: "1.1.1.1:11",
- ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
- }},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
- expectedHealthchecks: map[types.NamespacedName]int{},
- }, {
- // Case[14]: complex add and remove
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", complexBefore1),
- makeTestEndpoints("ns2", "ep2", complexBefore2),
- nil,
- makeTestEndpoints("ns4", "ep4", complexBefore4),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", complexAfter1),
- nil,
- makeTestEndpoints("ns3", "ep3", complexAfter3),
- makeTestEndpoints("ns4", "ep4", complexAfter4),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.22:22", IsLocal: true}},
- },
- makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.3:23", IsLocal: true}},
- },
- makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.4:44", IsLocal: true}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.5:44", IsLocal: true}},
- },
- makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.6:45", IsLocal: true}},
- },
- },
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.11:11", IsLocal: false}},
- },
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
- },
- makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:122", IsLocal: false}},
- },
- makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "3.3.3.3:33", IsLocal: false}},
- },
- makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.4:44", IsLocal: true}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{{
- Endpoint: "2.2.2.2:22",
- ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
- }, {
- Endpoint: "2.2.2.22:22",
- ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
- }, {
- Endpoint: "2.2.2.3:23",
- ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP),
- }, {
- Endpoint: "4.4.4.5:44",
- ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP),
- }, {
- Endpoint: "4.4.4.6:45",
- ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
- }},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{
- makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
- makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
- makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
- },
- expectedHealthchecks: map[types.NamespacedName]int{
- makeNSN("ns4", "ep4"): 1,
- },
- }, {
- // Case[15]: change from 0 endpoint address to 1 unnamed port
- previousEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", emptyEndpoint),
- },
- currentEndpoints: []*v1.Endpoints{
- makeTestEndpoints("ns1", "ep1", unnamedPort),
- },
- oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
- expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
- makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
- {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
- },
- },
- expectedStaleEndpoints: []proxy.ServiceEndpoint{},
- expectedStaleServiceNames: map[proxy.ServicePortName]bool{
- makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
- },
- expectedHealthchecks: map[types.NamespacedName]int{},
- },
- }
- for tci, tc := range testCases {
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, false)
- fp.hostname = nodeName
- // First check that after adding all previous versions of endpoints,
- // the fp.oldEndpoints is as we expect.
- for i := range tc.previousEndpoints {
- if tc.previousEndpoints[i] != nil {
- fp.OnEndpointsAdd(tc.previousEndpoints[i])
- }
- }
- fp.endpointsMap.Update(fp.endpointsChanges)
- compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
- // Now let's call appropriate handlers to get to state we want to be.
- if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
- t.Fatalf("[%d] different lengths of previous and current endpoints", tci)
- continue
- }
- for i := range tc.previousEndpoints {
- prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i]
- switch {
- case prev == nil:
- fp.OnEndpointsAdd(curr)
- case curr == nil:
- fp.OnEndpointsDelete(prev)
- default:
- fp.OnEndpointsUpdate(prev, curr)
- }
- }
- result := fp.endpointsMap.Update(fp.endpointsChanges)
- newMap := fp.endpointsMap
- compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
- if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
- t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
- }
- for _, x := range tc.expectedStaleEndpoints {
- found := false
- for _, stale := range result.StaleEndpoints {
- if stale == x {
- found = true
- break
- }
- }
- if !found {
- t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints)
- }
- }
- if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) {
- t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames)
- }
- for svcName := range tc.expectedStaleServiceNames {
- found := false
- for _, stale := range result.StaleServiceNames {
- if stale == svcName {
- found = true
- }
- }
- if !found {
- t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
- }
- }
- if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
- t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
- }
- }
- }
- // The majority of EndpointSlice specific tests are not iptables specific and focus on
- // the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the
- // iptables proxier supports translating EndpointSlices to iptables output.
- func TestEndpointSliceE2E(t *testing.T) {
- expectedIPTablesWithSlice := `*filter
- :KUBE-SERVICES - [0:0]
- :KUBE-EXTERNAL-SERVICES - [0:0]
- :KUBE-FORWARD - [0:0]
- -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
- -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark -j ACCEPT
- -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
- -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
- COMMIT
- *nat
- :KUBE-SERVICES - [0:0]
- :KUBE-NODEPORTS - [0:0]
- :KUBE-POSTROUTING - [0:0]
- :KUBE-MARK-MASQ - [0:0]
- :KUBE-SVC-AHZNAGK3SCETOS2T - [0:0]
- :KUBE-SEP-PXD6POUVGD2I37UY - [0:0]
- :KUBE-SEP-SOKZUIT7SCEVIP33 - [0:0]
- :KUBE-SEP-WVE3FAB34S7NZGDJ - [0:0]
- -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark -j MASQUERADE
- -A KUBE-MARK-MASQ -j MARK --set-xmark
- -A KUBE-SERVICES -m comment --comment "ns1/svc1: cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
- -A KUBE-SERVICES -m comment --comment "ns1/svc1: cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AHZNAGK3SCETOS2T
- -A KUBE-SVC-AHZNAGK3SCETOS2T -m comment --comment ns1/svc1: -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-PXD6POUVGD2I37UY
- -A KUBE-SEP-PXD6POUVGD2I37UY -m comment --comment ns1/svc1: -s 10.0.1.1/32 -j KUBE-MARK-MASQ
- -A KUBE-SEP-PXD6POUVGD2I37UY -m comment --comment ns1/svc1: -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
- -A KUBE-SVC-AHZNAGK3SCETOS2T -m comment --comment ns1/svc1: -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-SOKZUIT7SCEVIP33
- -A KUBE-SEP-SOKZUIT7SCEVIP33 -m comment --comment ns1/svc1: -s 10.0.1.2/32 -j KUBE-MARK-MASQ
- -A KUBE-SEP-SOKZUIT7SCEVIP33 -m comment --comment ns1/svc1: -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
- -A KUBE-SVC-AHZNAGK3SCETOS2T -m comment --comment ns1/svc1: -j KUBE-SEP-WVE3FAB34S7NZGDJ
- -A KUBE-SEP-WVE3FAB34S7NZGDJ -m comment --comment ns1/svc1: -s 10.0.1.3/32 -j KUBE-MARK-MASQ
- -A KUBE-SEP-WVE3FAB34S7NZGDJ -m comment --comment ns1/svc1: -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
- -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
- COMMIT
- `
- ipt := iptablestest.NewFake()
- fp := NewFakeProxier(ipt, true)
- fp.OnServiceSynced()
- fp.OnEndpointsSynced()
- fp.OnEndpointSlicesSynced()
- serviceName := "svc1"
- namespaceName := "ns1"
- fp.OnServiceAdd(&v1.Service{
- ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
- Spec: v1.ServiceSpec{
- ClusterIP: "172.20.1.1",
- Selector: map[string]string{"foo": "bar"},
- Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}},
- },
- })
- tcpProtocol := v1.ProtocolTCP
- endpointSlice := &discovery.EndpointSlice{
- ObjectMeta: metav1.ObjectMeta{
- Name: fmt.Sprintf("%s-1", serviceName),
- Namespace: namespaceName,
- Labels: map[string]string{discovery.LabelServiceName: serviceName},
- },
- Ports: []discovery.EndpointPort{{
- Name: utilpointer.StringPtr(""),
- Port: utilpointer.Int32Ptr(80),
- Protocol: &tcpProtocol,
- }},
- AddressType: discovery.AddressTypeIPv4,
- Endpoints: []discovery.Endpoint{{
- Addresses: []string{"10.0.1.1"},
- Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
- Topology: map[string]string{"kubernetes.io/hostname": testHostname},
- }, {
- Addresses: []string{"10.0.1.2"},
- Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
- Topology: map[string]string{"kubernetes.io/hostname": "node2"},
- }, {
- Addresses: []string{"10.0.1.3"},
- Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
- Topology: map[string]string{"kubernetes.io/hostname": "node3"},
- }},
- }
- fp.OnEndpointSliceAdd(endpointSlice)
- fp.syncProxyRules()
- assert.Equal(t, expectedIPTablesWithSlice, fp.iptablesData.String())
- fp.OnEndpointSliceDelete(endpointSlice)
- fp.syncProxyRules()
- assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String())
- }
- // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.
|