proxier_test.go 83 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package iptables
  14. import (
  15. "bytes"
  16. "fmt"
  17. "net"
  18. "reflect"
  19. "strconv"
  20. "strings"
  21. "testing"
  22. "time"
  23. "k8s.io/klog"
  24. "github.com/stretchr/testify/assert"
  25. v1 "k8s.io/api/core/v1"
  26. discovery "k8s.io/api/discovery/v1beta1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/types"
  29. "k8s.io/apimachinery/pkg/util/intstr"
  30. "k8s.io/kubernetes/pkg/proxy"
  31. "k8s.io/kubernetes/pkg/proxy/healthcheck"
  32. utilproxy "k8s.io/kubernetes/pkg/proxy/util"
  33. proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
  34. utilproxytest "k8s.io/kubernetes/pkg/proxy/util/testing"
  35. "k8s.io/kubernetes/pkg/util/async"
  36. "k8s.io/kubernetes/pkg/util/conntrack"
  37. utiliptables "k8s.io/kubernetes/pkg/util/iptables"
  38. iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
  39. "k8s.io/utils/exec"
  40. fakeexec "k8s.io/utils/exec/testing"
  41. utilpointer "k8s.io/utils/pointer"
  42. )
  43. func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
  44. chainLines := utiliptables.GetChainLines(table, save)
  45. for chain, lineBytes := range chainLines {
  46. line := string(lineBytes)
  47. if expected, exists := expectedLines[chain]; exists {
  48. if expected != line {
  49. t.Errorf("getChainLines expected chain line not present. For chain: %s Expected: %s Got: %s", chain, expected, line)
  50. }
  51. } else {
  52. t.Errorf("getChainLines expected chain not present: %s", chain)
  53. }
  54. }
  55. }
  56. func TestGetChainLines(t *testing.T) {
  57. iptablesSave := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014
  58. *nat
  59. :PREROUTING ACCEPT [2136997:197881818]
  60. :POSTROUTING ACCEPT [4284525:258542680]
  61. :OUTPUT ACCEPT [5901660:357267963]
  62. -A PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
  63. COMMIT
  64. # Completed on Wed Oct 29 14:56:01 2014`
  65. expected := map[utiliptables.Chain]string{
  66. utiliptables.ChainPrerouting: ":PREROUTING ACCEPT [2136997:197881818]",
  67. utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [4284525:258542680]",
  68. utiliptables.ChainOutput: ":OUTPUT ACCEPT [5901660:357267963]",
  69. }
  70. checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
  71. }
  72. func TestGetChainLinesMultipleTables(t *testing.T) {
  73. iptablesSave := `# Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
  74. *nat
  75. :PREROUTING ACCEPT [2:138]
  76. :INPUT ACCEPT [0:0]
  77. :OUTPUT ACCEPT [0:0]
  78. :POSTROUTING ACCEPT [0:0]
  79. :DOCKER - [0:0]
  80. :KUBE-NODEPORT-CONTAINER - [0:0]
  81. :KUBE-NODEPORT-HOST - [0:0]
  82. :KUBE-PORTALS-CONTAINER - [0:0]
  83. :KUBE-PORTALS-HOST - [0:0]
  84. :KUBE-SVC-1111111111111111 - [0:0]
  85. :KUBE-SVC-2222222222222222 - [0:0]
  86. :KUBE-SVC-3333333333333333 - [0:0]
  87. :KUBE-SVC-4444444444444444 - [0:0]
  88. :KUBE-SVC-5555555555555555 - [0:0]
  89. :KUBE-SVC-6666666666666666 - [0:0]
  90. -A PREROUTING -m comment --comment "handle ClusterIPs; NOTE: this must be before the NodePort rules" -j KUBE-PORTALS-CONTAINER
  91. -A PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
  92. -A PREROUTING -m addrtype --dst-type LOCAL -m comment --comment "handle service NodePorts; NOTE: this must be the last rule in the chain" -j KUBE-NODEPORT-CONTAINER
  93. -A OUTPUT -m comment --comment "handle ClusterIPs; NOTE: this must be before the NodePort rules" -j KUBE-PORTALS-HOST
  94. -A OUTPUT ! -d 127.0.0.0/8 -m addrtype --dst-type LOCAL -j DOCKER
  95. -A OUTPUT -m addrtype --dst-type LOCAL -m comment --comment "handle service NodePorts; NOTE: this must be the last rule in the chain" -j KUBE-NODEPORT-HOST
  96. -A POSTROUTING -s 10.246.1.0/24 ! -o cbr0 -j MASQUERADE
  97. -A POSTROUTING -s 10.0.2.15/32 -d 10.0.2.15/32 -m comment --comment "handle pod connecting to self" -j MASQUERADE
  98. -A KUBE-PORTALS-CONTAINER -d 10.247.0.1/32 -p tcp -m comment --comment "portal for default/kubernetes:" -m state --state NEW -m tcp --dport 443 -j KUBE-SVC-5555555555555555
  99. -A KUBE-PORTALS-CONTAINER -d 10.247.0.10/32 -p udp -m comment --comment "portal for kube-system/kube-dns:dns" -m state --state NEW -m udp --dport 53 -j KUBE-SVC-6666666666666666
  100. -A KUBE-PORTALS-CONTAINER -d 10.247.0.10/32 -p tcp -m comment --comment "portal for kube-system/kube-dns:dns-tcp" -m state --state NEW -m tcp --dport 53 -j KUBE-SVC-2222222222222222
  101. -A KUBE-PORTALS-HOST -d 10.247.0.1/32 -p tcp -m comment --comment "portal for default/kubernetes:" -m state --state NEW -m tcp --dport 443 -j KUBE-SVC-5555555555555555
  102. -A KUBE-PORTALS-HOST -d 10.247.0.10/32 -p udp -m comment --comment "portal for kube-system/kube-dns:dns" -m state --state NEW -m udp --dport 53 -j KUBE-SVC-6666666666666666
  103. -A KUBE-PORTALS-HOST -d 10.247.0.10/32 -p tcp -m comment --comment "portal for kube-system/kube-dns:dns-tcp" -m state --state NEW -m tcp --dport 53 -j KUBE-SVC-2222222222222222
  104. -A KUBE-SVC-1111111111111111 -p udp -m comment --comment "kube-system/kube-dns:dns" -m recent --set --name KUBE-SVC-1111111111111111 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.246.1.2:53
  105. -A KUBE-SVC-2222222222222222 -m comment --comment "kube-system/kube-dns:dns-tcp" -j KUBE-SVC-3333333333333333
  106. -A KUBE-SVC-3333333333333333 -p tcp -m comment --comment "kube-system/kube-dns:dns-tcp" -m recent --set --name KUBE-SVC-3333333333333333 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.246.1.2:53
  107. -A KUBE-SVC-4444444444444444 -p tcp -m comment --comment "default/kubernetes:" -m recent --set --name KUBE-SVC-4444444444444444 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.245.1.2:443
  108. -A KUBE-SVC-5555555555555555 -m comment --comment "default/kubernetes:" -j KUBE-SVC-4444444444444444
  109. -A KUBE-SVC-6666666666666666 -m comment --comment "kube-system/kube-dns:dns" -j KUBE-SVC-1111111111111111
  110. COMMIT
  111. # Completed on Fri Aug 7 14:47:37 2015
  112. # Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
  113. *filter
  114. :INPUT ACCEPT [17514:83115836]
  115. :FORWARD ACCEPT [0:0]
  116. :OUTPUT ACCEPT [8909:688225]
  117. :DOCKER - [0:0]
  118. -A FORWARD -o cbr0 -j DOCKER
  119. -A FORWARD -o cbr0 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
  120. -A FORWARD -i cbr0 ! -o cbr0 -j ACCEPT
  121. -A FORWARD -i cbr0 -o cbr0 -j ACCEPT
  122. COMMIT
  123. `
  124. expected := map[utiliptables.Chain]string{
  125. utiliptables.ChainPrerouting: ":PREROUTING ACCEPT [2:138]",
  126. utiliptables.Chain("INPUT"): ":INPUT ACCEPT [0:0]",
  127. utiliptables.Chain("OUTPUT"): ":OUTPUT ACCEPT [0:0]",
  128. utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [0:0]",
  129. utiliptables.Chain("DOCKER"): ":DOCKER - [0:0]",
  130. utiliptables.Chain("KUBE-NODEPORT-CONTAINER"): ":KUBE-NODEPORT-CONTAINER - [0:0]",
  131. utiliptables.Chain("KUBE-NODEPORT-HOST"): ":KUBE-NODEPORT-HOST - [0:0]",
  132. utiliptables.Chain("KUBE-PORTALS-CONTAINER"): ":KUBE-PORTALS-CONTAINER - [0:0]",
  133. utiliptables.Chain("KUBE-PORTALS-HOST"): ":KUBE-PORTALS-HOST - [0:0]",
  134. utiliptables.Chain("KUBE-SVC-1111111111111111"): ":KUBE-SVC-1111111111111111 - [0:0]",
  135. utiliptables.Chain("KUBE-SVC-2222222222222222"): ":KUBE-SVC-2222222222222222 - [0:0]",
  136. utiliptables.Chain("KUBE-SVC-3333333333333333"): ":KUBE-SVC-3333333333333333 - [0:0]",
  137. utiliptables.Chain("KUBE-SVC-4444444444444444"): ":KUBE-SVC-4444444444444444 - [0:0]",
  138. utiliptables.Chain("KUBE-SVC-5555555555555555"): ":KUBE-SVC-5555555555555555 - [0:0]",
  139. utiliptables.Chain("KUBE-SVC-6666666666666666"): ":KUBE-SVC-6666666666666666 - [0:0]",
  140. }
  141. checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
  142. }
  143. func TestDeleteEndpointConnections(t *testing.T) {
  144. const (
  145. UDP = v1.ProtocolUDP
  146. TCP = v1.ProtocolTCP
  147. SCTP = v1.ProtocolSCTP
  148. )
  149. testCases := []struct {
  150. description string
  151. svcName string
  152. svcIP string
  153. svcPort int32
  154. protocol v1.Protocol
  155. endpoint string // IP:port endpoint
  156. epSvcPair proxy.ServiceEndpoint // Will be generated by test
  157. simulatedErr string
  158. }{
  159. {
  160. description: "V4 UDP",
  161. svcName: "v4-udp",
  162. svcIP: "10.96.1.1",
  163. svcPort: 80,
  164. protocol: UDP,
  165. endpoint: "10.240.0.3:80",
  166. }, {
  167. description: "V4 TCP",
  168. svcName: "v4-tcp",
  169. svcIP: "10.96.2.2",
  170. svcPort: 80,
  171. protocol: TCP,
  172. endpoint: "10.240.0.4:80",
  173. }, {
  174. description: "V4 SCTP",
  175. svcName: "v4-sctp",
  176. svcIP: "10.96.3.3",
  177. svcPort: 80,
  178. protocol: SCTP,
  179. endpoint: "10.240.0.5:80",
  180. }, {
  181. description: "V4 UDP, nothing to delete, benign error",
  182. svcName: "v4-udp-nothing-to-delete",
  183. svcIP: "10.96.1.1",
  184. svcPort: 80,
  185. protocol: UDP,
  186. endpoint: "10.240.0.3:80",
  187. simulatedErr: conntrack.NoConnectionToDelete,
  188. }, {
  189. description: "V4 UDP, unexpected error, should be glogged",
  190. svcName: "v4-udp-simulated-error",
  191. svcIP: "10.96.1.1",
  192. svcPort: 80,
  193. protocol: UDP,
  194. endpoint: "10.240.0.3:80",
  195. simulatedErr: "simulated error",
  196. }, {
  197. description: "V6 UDP",
  198. svcName: "v6-udp",
  199. svcIP: "fd00:1234::20",
  200. svcPort: 80,
  201. protocol: UDP,
  202. endpoint: "[2001:db8::2]:80",
  203. }, {
  204. description: "V6 TCP",
  205. svcName: "v6-tcp",
  206. svcIP: "fd00:1234::30",
  207. svcPort: 80,
  208. protocol: TCP,
  209. endpoint: "[2001:db8::3]:80",
  210. }, {
  211. description: "V6 SCTP",
  212. svcName: "v6-sctp",
  213. svcIP: "fd00:1234::40",
  214. svcPort: 80,
  215. protocol: SCTP,
  216. endpoint: "[2001:db8::4]:80",
  217. },
  218. }
  219. // Create a fake executor for the conntrack utility. This should only be
  220. // invoked for UDP connections, since no conntrack cleanup is needed for TCP
  221. fcmd := fakeexec.FakeCmd{}
  222. fexec := fakeexec.FakeExec{
  223. LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
  224. }
  225. execFunc := func(cmd string, args ...string) exec.Cmd {
  226. return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
  227. }
  228. for _, tc := range testCases {
  229. if tc.protocol == UDP {
  230. var cmdOutput string
  231. var simErr error
  232. if tc.simulatedErr == "" {
  233. cmdOutput = "1 flow entries have been deleted"
  234. } else {
  235. simErr = fmt.Errorf(tc.simulatedErr)
  236. }
  237. cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr }
  238. fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
  239. fexec.CommandScript = append(fexec.CommandScript, execFunc)
  240. }
  241. }
  242. ipt := iptablestest.NewFake()
  243. fp := NewFakeProxier(ipt, false)
  244. fp.exec = &fexec
  245. for _, tc := range testCases {
  246. makeServiceMap(fp,
  247. makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
  248. svc.Spec.ClusterIP = tc.svcIP
  249. svc.Spec.Ports = []v1.ServicePort{{
  250. Name: "p80",
  251. Port: tc.svcPort,
  252. Protocol: tc.protocol,
  253. }}
  254. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  255. }),
  256. )
  257. proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  258. }
  259. // Run the test cases
  260. for _, tc := range testCases {
  261. priorExecs := fexec.CommandCalls
  262. priorGlogErrs := klog.Stats.Error.Lines()
  263. svc := proxy.ServicePortName{
  264. NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
  265. Port: "p80",
  266. Protocol: tc.protocol,
  267. }
  268. input := []proxy.ServiceEndpoint{
  269. {
  270. Endpoint: tc.endpoint,
  271. ServicePortName: svc,
  272. },
  273. }
  274. fp.deleteEndpointConnections(input)
  275. // For UDP connections, check the executed conntrack command
  276. var expExecs int
  277. if tc.protocol == UDP {
  278. isIPv6 := func(ip string) bool {
  279. netIP := net.ParseIP(ip)
  280. return netIP.To4() == nil
  281. }
  282. endpointIP := utilproxy.IPPart(tc.endpoint)
  283. expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
  284. if isIPv6(endpointIP) {
  285. expectCommand += " -f ipv6"
  286. }
  287. actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ")
  288. if actualCommand != expectCommand {
  289. t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand)
  290. }
  291. expExecs = 1
  292. }
  293. // Check the number of times conntrack was executed
  294. execs := fexec.CommandCalls - priorExecs
  295. if execs != expExecs {
  296. t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs)
  297. }
  298. // Check the number of new glog errors
  299. var expGlogErrs int64
  300. if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
  301. expGlogErrs = 1
  302. }
  303. glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
  304. if glogErrs != expGlogErrs {
  305. t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs)
  306. }
  307. }
  308. }
  309. // fakePortOpener implements portOpener.
  310. type fakePortOpener struct {
  311. openPorts []*utilproxy.LocalPort
  312. }
  313. // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
  314. // to lock a local port.
  315. func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
  316. f.openPorts = append(f.openPorts, lp)
  317. return nil, nil
  318. }
  319. const testHostname = "test-hostname"
  320. func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Proxier {
  321. // TODO: Call NewProxier after refactoring out the goroutine
  322. // invocation into a Run() method.
  323. detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/24", ipt)
  324. p := &Proxier{
  325. exec: &fakeexec.FakeExec{},
  326. serviceMap: make(proxy.ServiceMap),
  327. serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
  328. endpointsMap: make(proxy.EndpointsMap),
  329. endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled),
  330. iptables: ipt,
  331. localDetector: detectLocal,
  332. hostname: testHostname,
  333. portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
  334. portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
  335. serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
  336. precomputedProbabilities: make([]string, 0, 1001),
  337. iptablesData: bytes.NewBuffer(nil),
  338. existingFilterChainsData: bytes.NewBuffer(nil),
  339. filterChains: bytes.NewBuffer(nil),
  340. filterRules: bytes.NewBuffer(nil),
  341. natChains: bytes.NewBuffer(nil),
  342. natRules: bytes.NewBuffer(nil),
  343. nodePortAddresses: make([]string, 0),
  344. networkInterfacer: utilproxytest.NewFakeNetwork(),
  345. }
  346. p.setInitialized(true)
  347. p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
  348. return p
  349. }
  350. func hasSessionAffinityRule(rules []iptablestest.Rule) bool {
  351. for _, r := range rules {
  352. if _, ok := r[iptablestest.Recent]; ok {
  353. return true
  354. }
  355. }
  356. return false
  357. }
  358. func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool {
  359. destPortStr := strconv.Itoa(destPort)
  360. match := false
  361. for _, r := range rules {
  362. if r[iptablestest.Jump] == destChain {
  363. match = true
  364. if destIP != "" {
  365. if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPortStr) || r[iptablestest.DPort] == "") {
  366. return true
  367. }
  368. match = false
  369. }
  370. if destPort != 0 {
  371. if strings.Contains(r[iptablestest.DPort], destPortStr) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") {
  372. return true
  373. }
  374. match = false
  375. }
  376. }
  377. }
  378. return match
  379. }
  380. func hasSrcType(rules []iptablestest.Rule, srcType string) bool {
  381. for _, r := range rules {
  382. if r[iptablestest.SrcType] != srcType {
  383. continue
  384. }
  385. return true
  386. }
  387. return false
  388. }
  389. func hasMasqRandomFully(rules []iptablestest.Rule) bool {
  390. for _, r := range rules {
  391. if r[iptablestest.Masquerade] == "--random-fully" {
  392. return true
  393. }
  394. }
  395. return false
  396. }
  397. func TestHasJump(t *testing.T) {
  398. testCases := map[string]struct {
  399. rules []iptablestest.Rule
  400. destChain string
  401. destIP string
  402. destPort int
  403. expected bool
  404. }{
  405. "case 1": {
  406. // Match the 1st rule(both dest IP and dest Port)
  407. rules: []iptablestest.Rule{
  408. {"-d ": "10.20.30.41/32", "--dport ": "80", "-p ": "tcp", "-j ": "REJECT"},
  409. {"--dport ": "3001", "-p ": "tcp", "-j ": "KUBE-MARK-MASQ"},
  410. },
  411. destChain: "REJECT",
  412. destIP: "10.20.30.41",
  413. destPort: 80,
  414. expected: true,
  415. },
  416. "case 2": {
  417. // Match the 2nd rule(dest Port)
  418. rules: []iptablestest.Rule{
  419. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  420. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  421. },
  422. destChain: "REJECT",
  423. destIP: "",
  424. destPort: 3001,
  425. expected: true,
  426. },
  427. "case 3": {
  428. // Match both dest IP and dest Port
  429. rules: []iptablestest.Rule{
  430. {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
  431. },
  432. destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
  433. destIP: "1.2.3.4",
  434. destPort: 80,
  435. expected: true,
  436. },
  437. "case 4": {
  438. // Match dest IP but doesn't match dest Port
  439. rules: []iptablestest.Rule{
  440. {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
  441. },
  442. destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
  443. destIP: "1.2.3.4",
  444. destPort: 8080,
  445. expected: false,
  446. },
  447. "case 5": {
  448. // Match dest Port but doesn't match dest IP
  449. rules: []iptablestest.Rule{
  450. {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
  451. },
  452. destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
  453. destIP: "10.20.30.40",
  454. destPort: 80,
  455. expected: false,
  456. },
  457. "case 6": {
  458. // Match the 2nd rule(dest IP)
  459. rules: []iptablestest.Rule{
  460. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  461. {"-d ": "1.2.3.4/32", "-p ": "tcp", "-j ": "REJECT"},
  462. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  463. },
  464. destChain: "REJECT",
  465. destIP: "1.2.3.4",
  466. destPort: 8080,
  467. expected: true,
  468. },
  469. "case 7": {
  470. // Match the 2nd rule(dest Port)
  471. rules: []iptablestest.Rule{
  472. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  473. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  474. },
  475. destChain: "REJECT",
  476. destIP: "1.2.3.4",
  477. destPort: 3001,
  478. expected: true,
  479. },
  480. "case 8": {
  481. // Match the 1st rule(dest IP)
  482. rules: []iptablestest.Rule{
  483. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  484. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  485. },
  486. destChain: "REJECT",
  487. destIP: "10.20.30.41",
  488. destPort: 8080,
  489. expected: true,
  490. },
  491. "case 9": {
  492. rules: []iptablestest.Rule{
  493. {"-j ": "KUBE-SEP-LWSOSDSHMKPJHHJV"},
  494. },
  495. destChain: "KUBE-SEP-LWSOSDSHMKPJHHJV",
  496. destIP: "",
  497. destPort: 0,
  498. expected: true,
  499. },
  500. "case 10": {
  501. rules: []iptablestest.Rule{
  502. {"-j ": "KUBE-SEP-FOO"},
  503. },
  504. destChain: "KUBE-SEP-BAR",
  505. destIP: "",
  506. destPort: 0,
  507. expected: false,
  508. },
  509. }
  510. for k, tc := range testCases {
  511. if got := hasJump(tc.rules, tc.destChain, tc.destIP, tc.destPort); got != tc.expected {
  512. t.Errorf("%v: expected %v, got %v", k, tc.expected, got)
  513. }
  514. }
  515. }
  516. func hasDNAT(rules []iptablestest.Rule, endpoint string) bool {
  517. for _, r := range rules {
  518. if r[iptablestest.ToDest] == endpoint {
  519. return true
  520. }
  521. }
  522. return false
  523. }
  524. func errorf(msg string, rules []iptablestest.Rule, t *testing.T) {
  525. for _, r := range rules {
  526. t.Logf("%q", r)
  527. }
  528. t.Errorf("%v", msg)
  529. }
  530. func TestClusterIPReject(t *testing.T) {
  531. ipt := iptablestest.NewFake()
  532. fp := NewFakeProxier(ipt, false)
  533. svcIP := "10.20.30.41"
  534. svcPort := 80
  535. svcPortName := proxy.ServicePortName{
  536. NamespacedName: makeNSN("ns1", "svc1"),
  537. Port: "p80",
  538. }
  539. makeServiceMap(fp,
  540. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  541. svc.Spec.ClusterIP = svcIP
  542. svc.Spec.Ports = []v1.ServicePort{{
  543. Name: svcPortName.Port,
  544. Port: int32(svcPort),
  545. Protocol: v1.ProtocolTCP,
  546. }}
  547. }),
  548. )
  549. makeEndpointsMap(fp)
  550. fp.syncProxyRules()
  551. svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP))))
  552. svcRules := ipt.GetRules(svcChain)
  553. if len(svcRules) != 0 {
  554. errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcPortName), svcRules, t)
  555. }
  556. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  557. if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcPort) {
  558. errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
  559. }
  560. }
  561. func TestClusterIPEndpointsJump(t *testing.T) {
  562. ipt := iptablestest.NewFake()
  563. fp := NewFakeProxier(ipt, false)
  564. svcIP := "10.20.30.41"
  565. svcPort := 80
  566. svcPortName := proxy.ServicePortName{
  567. NamespacedName: makeNSN("ns1", "svc1"),
  568. Port: "p80",
  569. Protocol: v1.ProtocolTCP,
  570. }
  571. makeServiceMap(fp,
  572. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  573. svc.Spec.ClusterIP = svcIP
  574. svc.Spec.Ports = []v1.ServicePort{{
  575. Name: svcPortName.Port,
  576. Port: int32(svcPort),
  577. Protocol: v1.ProtocolTCP,
  578. }}
  579. }),
  580. )
  581. epIP := "10.180.0.1"
  582. makeEndpointsMap(fp,
  583. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  584. ept.Subsets = []v1.EndpointSubset{{
  585. Addresses: []v1.EndpointAddress{{
  586. IP: epIP,
  587. }},
  588. Ports: []v1.EndpointPort{{
  589. Name: svcPortName.Port,
  590. Port: int32(svcPort),
  591. Protocol: v1.ProtocolTCP,
  592. }},
  593. }}
  594. }),
  595. )
  596. fp.syncProxyRules()
  597. epStr := fmt.Sprintf("%s:%d", epIP, svcPort)
  598. svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP))))
  599. epChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStr))
  600. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  601. if !hasJump(kubeSvcRules, svcChain, svcIP, svcPort) {
  602. errorf(fmt.Sprintf("Failed to find jump from KUBE-SERVICES to %v chain", svcChain), kubeSvcRules, t)
  603. }
  604. svcRules := ipt.GetRules(svcChain)
  605. if !hasJump(svcRules, epChain, "", 0) {
  606. errorf(fmt.Sprintf("Failed to jump to ep chain %v", epChain), svcRules, t)
  607. }
  608. epRules := ipt.GetRules(epChain)
  609. if !hasDNAT(epRules, epStr) {
  610. errorf(fmt.Sprintf("Endpoint chain %v lacks DNAT to %v", epChain, epStr), epRules, t)
  611. }
  612. }
  613. func TestLoadBalancer(t *testing.T) {
  614. ipt := iptablestest.NewFake()
  615. fp := NewFakeProxier(ipt, false)
  616. svcIP := "10.20.30.41"
  617. svcPort := 80
  618. svcNodePort := 3001
  619. svcLBIP := "1.2.3.4"
  620. svcPortName := proxy.ServicePortName{
  621. NamespacedName: makeNSN("ns1", "svc1"),
  622. Port: "p80",
  623. Protocol: v1.ProtocolTCP,
  624. }
  625. makeServiceMap(fp,
  626. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  627. svc.Spec.Type = "LoadBalancer"
  628. svc.Spec.ClusterIP = svcIP
  629. svc.Spec.Ports = []v1.ServicePort{{
  630. Name: svcPortName.Port,
  631. Port: int32(svcPort),
  632. Protocol: v1.ProtocolTCP,
  633. NodePort: int32(svcNodePort),
  634. }}
  635. svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
  636. IP: svcLBIP,
  637. }}
  638. }),
  639. )
  640. epIP := "10.180.0.1"
  641. makeEndpointsMap(fp,
  642. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  643. ept.Subsets = []v1.EndpointSubset{{
  644. Addresses: []v1.EndpointAddress{{
  645. IP: epIP,
  646. }},
  647. Ports: []v1.EndpointPort{{
  648. Name: svcPortName.Port,
  649. Port: int32(svcPort),
  650. Protocol: v1.ProtocolTCP,
  651. }},
  652. }}
  653. }),
  654. )
  655. fp.syncProxyRules()
  656. proto := strings.ToLower(string(v1.ProtocolTCP))
  657. fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
  658. svcChain := string(servicePortChainName(svcPortName.String(), proto))
  659. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  660. if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
  661. errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
  662. }
  663. fwRules := ipt.GetRules(fwChain)
  664. if !hasJump(fwRules, svcChain, "", 0) || !hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
  665. errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, svcChain), fwRules, t)
  666. }
  667. }
  668. func TestNodePort(t *testing.T) {
  669. ipt := iptablestest.NewFake()
  670. fp := NewFakeProxier(ipt, false)
  671. svcIP := "10.20.30.41"
  672. svcPort := 80
  673. svcNodePort := 3001
  674. svcPortName := proxy.ServicePortName{
  675. NamespacedName: makeNSN("ns1", "svc1"),
  676. Port: "p80",
  677. Protocol: v1.ProtocolTCP,
  678. }
  679. makeServiceMap(fp,
  680. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  681. svc.Spec.Type = "NodePort"
  682. svc.Spec.ClusterIP = svcIP
  683. svc.Spec.Ports = []v1.ServicePort{{
  684. Name: svcPortName.Port,
  685. Port: int32(svcPort),
  686. Protocol: v1.ProtocolTCP,
  687. NodePort: int32(svcNodePort),
  688. }}
  689. }),
  690. )
  691. epIP := "10.180.0.1"
  692. makeEndpointsMap(fp,
  693. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  694. ept.Subsets = []v1.EndpointSubset{{
  695. Addresses: []v1.EndpointAddress{{
  696. IP: epIP,
  697. }},
  698. Ports: []v1.EndpointPort{{
  699. Name: svcPortName.Port,
  700. Port: int32(svcPort),
  701. Protocol: v1.ProtocolTCP,
  702. }},
  703. }}
  704. }),
  705. )
  706. itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
  707. addrs := []net.Addr{utilproxytest.AddrStruct{Val: "127.0.0.1/16"}}
  708. itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
  709. addrs1 := []net.Addr{utilproxytest.AddrStruct{Val: "::1/128"}}
  710. fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
  711. fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
  712. fp.nodePortAddresses = []string{}
  713. fp.syncProxyRules()
  714. proto := strings.ToLower(string(v1.ProtocolTCP))
  715. svcChain := string(servicePortChainName(svcPortName.String(), proto))
  716. kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
  717. if !hasJump(kubeNodePortRules, svcChain, "", svcNodePort) {
  718. errorf(fmt.Sprintf("Failed to find jump to svc chain %v", svcChain), kubeNodePortRules, t)
  719. }
  720. }
  721. func TestMasqueradeRule(t *testing.T) {
  722. for _, testcase := range []bool{false, true} {
  723. ipt := iptablestest.NewFake().SetHasRandomFully(testcase)
  724. fp := NewFakeProxier(ipt, false)
  725. makeServiceMap(fp)
  726. makeEndpointsMap(fp)
  727. fp.syncProxyRules()
  728. postRoutingRules := ipt.GetRules(string(kubePostroutingChain))
  729. if !hasJump(postRoutingRules, "MASQUERADE", "", 0) {
  730. errorf(fmt.Sprintf("Failed to find -j MASQUERADE in %s chain", kubePostroutingChain), postRoutingRules, t)
  731. }
  732. if hasMasqRandomFully(postRoutingRules) != testcase {
  733. probs := map[bool]string{false: "found", true: "did not find"}
  734. errorf(fmt.Sprintf("%s --random-fully in -j MASQUERADE rule in %s chain when HasRandomFully()==%v", probs[testcase], kubePostroutingChain, testcase), postRoutingRules, t)
  735. }
  736. }
  737. }
  738. func TestExternalIPsReject(t *testing.T) {
  739. ipt := iptablestest.NewFake()
  740. fp := NewFakeProxier(ipt, false)
  741. svcIP := "10.20.30.41"
  742. svcPort := 80
  743. svcExternalIPs := "50.60.70.81"
  744. svcPortName := proxy.ServicePortName{
  745. NamespacedName: makeNSN("ns1", "svc1"),
  746. Port: "p80",
  747. }
  748. makeServiceMap(fp,
  749. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  750. svc.Spec.Type = "ClusterIP"
  751. svc.Spec.ClusterIP = svcIP
  752. svc.Spec.ExternalIPs = []string{svcExternalIPs}
  753. svc.Spec.Ports = []v1.ServicePort{{
  754. Name: svcPortName.Port,
  755. Port: int32(svcPort),
  756. Protocol: v1.ProtocolTCP,
  757. TargetPort: intstr.FromInt(svcPort),
  758. }}
  759. }),
  760. )
  761. makeEndpointsMap(fp)
  762. fp.syncProxyRules()
  763. kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain))
  764. if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) {
  765. errorf(fmt.Sprintf("Failed to find a %v rule for externalIP %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
  766. }
  767. }
  768. func TestNodePortReject(t *testing.T) {
  769. ipt := iptablestest.NewFake()
  770. fp := NewFakeProxier(ipt, false)
  771. svcIP := "10.20.30.41"
  772. svcPort := 80
  773. svcNodePort := 3001
  774. svcPortName := proxy.ServicePortName{
  775. NamespacedName: makeNSN("ns1", "svc1"),
  776. Port: "p80",
  777. }
  778. makeServiceMap(fp,
  779. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  780. svc.Spec.Type = "NodePort"
  781. svc.Spec.ClusterIP = svcIP
  782. svc.Spec.Ports = []v1.ServicePort{{
  783. Name: svcPortName.Port,
  784. Port: int32(svcPort),
  785. Protocol: v1.ProtocolTCP,
  786. NodePort: int32(svcNodePort),
  787. }}
  788. }),
  789. )
  790. makeEndpointsMap(fp)
  791. fp.syncProxyRules()
  792. kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain))
  793. if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) {
  794. errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
  795. }
  796. }
  797. func TestOnlyLocalLoadBalancing(t *testing.T) {
  798. ipt := iptablestest.NewFake()
  799. fp := NewFakeProxier(ipt, false)
  800. svcIP := "10.20.30.41"
  801. svcPort := 80
  802. svcNodePort := 3001
  803. svcLBIP := "1.2.3.4"
  804. svcPortName := proxy.ServicePortName{
  805. NamespacedName: makeNSN("ns1", "svc1"),
  806. Port: "p80",
  807. Protocol: v1.ProtocolTCP,
  808. }
  809. svcSessionAffinityTimeout := int32(10800)
  810. makeServiceMap(fp,
  811. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  812. svc.Spec.Type = "LoadBalancer"
  813. svc.Spec.ClusterIP = svcIP
  814. svc.Spec.Ports = []v1.ServicePort{{
  815. Name: svcPortName.Port,
  816. Port: int32(svcPort),
  817. Protocol: v1.ProtocolTCP,
  818. NodePort: int32(svcNodePort),
  819. }}
  820. svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
  821. IP: svcLBIP,
  822. }}
  823. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  824. svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
  825. svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
  826. ClientIP: &v1.ClientIPConfig{TimeoutSeconds: &svcSessionAffinityTimeout},
  827. }
  828. }),
  829. )
  830. epIP1 := "10.180.0.1"
  831. epIP2 := "10.180.2.1"
  832. epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
  833. epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
  834. makeEndpointsMap(fp,
  835. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  836. ept.Subsets = []v1.EndpointSubset{{
  837. Addresses: []v1.EndpointAddress{{
  838. IP: epIP1,
  839. NodeName: nil,
  840. }, {
  841. IP: epIP2,
  842. NodeName: utilpointer.StringPtr(testHostname),
  843. }},
  844. Ports: []v1.EndpointPort{{
  845. Name: svcPortName.Port,
  846. Port: int32(svcPort),
  847. Protocol: v1.ProtocolTCP,
  848. }},
  849. }}
  850. }),
  851. )
  852. fp.syncProxyRules()
  853. proto := strings.ToLower(string(v1.ProtocolTCP))
  854. fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
  855. lbChain := string(serviceLBChainName(svcPortName.String(), proto))
  856. nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrLocal))
  857. localEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrNonLocal))
  858. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  859. if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
  860. errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
  861. }
  862. fwRules := ipt.GetRules(fwChain)
  863. if !hasJump(fwRules, lbChain, "", 0) {
  864. errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, lbChain), fwRules, t)
  865. }
  866. if hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
  867. errorf(fmt.Sprintf("Found jump from fw chain %v to MASQUERADE", fwChain), fwRules, t)
  868. }
  869. lbRules := ipt.GetRules(lbChain)
  870. if hasJump(lbRules, nonLocalEpChain, "", 0) {
  871. errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
  872. }
  873. if !hasJump(lbRules, localEpChain, "", 0) {
  874. errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrNonLocal), lbRules, t)
  875. }
  876. if !hasSessionAffinityRule(lbRules) {
  877. errorf(fmt.Sprintf("Didn't find session affinity rule from lb chain %v", lbChain), lbRules, t)
  878. }
  879. }
  880. func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) {
  881. ipt := iptablestest.NewFake()
  882. fp := NewFakeProxier(ipt, false)
  883. onlyLocalNodePorts(t, fp, ipt)
  884. }
  885. func TestOnlyLocalNodePorts(t *testing.T) {
  886. ipt := iptablestest.NewFake()
  887. fp := NewFakeProxier(ipt, false)
  888. onlyLocalNodePorts(t, fp, ipt)
  889. }
  890. func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTables) {
  891. svcIP := "10.20.30.41"
  892. svcPort := 80
  893. svcNodePort := 3001
  894. svcPortName := proxy.ServicePortName{
  895. NamespacedName: makeNSN("ns1", "svc1"),
  896. Port: "p80",
  897. Protocol: v1.ProtocolTCP,
  898. }
  899. makeServiceMap(fp,
  900. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  901. svc.Spec.Type = "NodePort"
  902. svc.Spec.ClusterIP = svcIP
  903. svc.Spec.Ports = []v1.ServicePort{{
  904. Name: svcPortName.Port,
  905. Port: int32(svcPort),
  906. Protocol: v1.ProtocolTCP,
  907. NodePort: int32(svcNodePort),
  908. }}
  909. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  910. }),
  911. )
  912. epIP1 := "10.180.0.1"
  913. epIP2 := "10.180.2.1"
  914. epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
  915. epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
  916. makeEndpointsMap(fp,
  917. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  918. ept.Subsets = []v1.EndpointSubset{{
  919. Addresses: []v1.EndpointAddress{{
  920. IP: epIP1,
  921. NodeName: nil,
  922. }, {
  923. IP: epIP2,
  924. NodeName: utilpointer.StringPtr(testHostname),
  925. }},
  926. Ports: []v1.EndpointPort{{
  927. Name: svcPortName.Port,
  928. Port: int32(svcPort),
  929. Protocol: v1.ProtocolTCP,
  930. }},
  931. }}
  932. }),
  933. )
  934. itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0}
  935. addrs := []net.Addr{utilproxytest.AddrStruct{Val: "10.20.30.51/24"}}
  936. fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
  937. fp.nodePortAddresses = []string{"10.20.30.0/24"}
  938. fp.syncProxyRules()
  939. proto := strings.ToLower(string(v1.ProtocolTCP))
  940. lbChain := string(serviceLBChainName(svcPortName.String(), proto))
  941. nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), proto, epStrLocal))
  942. localEpChain := string(servicePortEndpointChainName(svcPortName.String(), proto, epStrNonLocal))
  943. kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
  944. if !hasJump(kubeNodePortRules, lbChain, "", svcNodePort) {
  945. errorf(fmt.Sprintf("Failed to find jump to lb chain %v", lbChain), kubeNodePortRules, t)
  946. }
  947. if !hasJump(kubeNodePortRules, string(KubeMarkMasqChain), "", svcNodePort) {
  948. errorf(fmt.Sprintf("Failed to find jump to %s chain for destination IP %d", KubeMarkMasqChain, svcNodePort), kubeNodePortRules, t)
  949. }
  950. kubeServiceRules := ipt.GetRules(string(kubeServicesChain))
  951. if !hasJump(kubeServiceRules, string(kubeNodePortsChain), "10.20.30.51", 0) {
  952. errorf(fmt.Sprintf("Failed to find jump to KUBE-NODEPORTS chain %v", string(kubeNodePortsChain)), kubeServiceRules, t)
  953. }
  954. svcChain := string(servicePortChainName(svcPortName.String(), proto))
  955. lbRules := ipt.GetRules(lbChain)
  956. if hasJump(lbRules, nonLocalEpChain, "", 0) {
  957. errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
  958. }
  959. if !hasJump(lbRules, svcChain, "", 0) || !hasSrcType(lbRules, "LOCAL") {
  960. errorf(fmt.Sprintf("Did not find jump from lb chain %v to svc %v with src-type LOCAL", lbChain, svcChain), lbRules, t)
  961. }
  962. if !hasJump(lbRules, localEpChain, "", 0) {
  963. errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrLocal), lbRules, t)
  964. }
  965. }
  966. func TestComputeProbability(t *testing.T) {
  967. expectedProbabilities := map[int]string{
  968. 1: "1.0000000000",
  969. 2: "0.5000000000",
  970. 10: "0.1000000000",
  971. 100: "0.0100000000",
  972. 1000: "0.0010000000",
  973. 10000: "0.0001000000",
  974. 100000: "0.0000100000",
  975. 100001: "0.0000099999",
  976. }
  977. for num, expected := range expectedProbabilities {
  978. actual := computeProbability(num)
  979. if actual != expected {
  980. t.Errorf("Expected computeProbability(%d) to be %s, got: %s", num, expected, actual)
  981. }
  982. }
  983. prevProbability := float64(0)
  984. for i := 100000; i > 1; i-- {
  985. currProbability, err := strconv.ParseFloat(computeProbability(i), 64)
  986. if err != nil {
  987. t.Fatalf("Error parsing float probability for %d: %v", i, err)
  988. }
  989. if currProbability <= prevProbability {
  990. t.Fatalf("Probability unexpectedly <= to previous probability for %d: (%0.10f <= %0.10f)", i, currProbability, prevProbability)
  991. }
  992. prevProbability = currProbability
  993. }
  994. }
  995. func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
  996. svc := &v1.Service{
  997. ObjectMeta: metav1.ObjectMeta{
  998. Name: name,
  999. Namespace: namespace,
  1000. Annotations: map[string]string{},
  1001. },
  1002. Spec: v1.ServiceSpec{},
  1003. Status: v1.ServiceStatus{},
  1004. }
  1005. svcFunc(svc)
  1006. return svc
  1007. }
  1008. func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port, nodeport int32, targetPort int) []v1.ServicePort {
  1009. svcPort := v1.ServicePort{
  1010. Name: name,
  1011. Protocol: protocol,
  1012. Port: port,
  1013. NodePort: nodeport,
  1014. TargetPort: intstr.FromInt(targetPort),
  1015. }
  1016. return append(array, svcPort)
  1017. }
  1018. func TestBuildServiceMapAddRemove(t *testing.T) {
  1019. ipt := iptablestest.NewFake()
  1020. fp := NewFakeProxier(ipt, false)
  1021. services := []*v1.Service{
  1022. makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
  1023. svc.Spec.Type = v1.ServiceTypeClusterIP
  1024. svc.Spec.ClusterIP = "172.16.55.4"
  1025. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
  1026. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
  1027. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpport", "SCTP", 1236, 6321, 0)
  1028. }),
  1029. makeTestService("somewhere-else", "node-port", func(svc *v1.Service) {
  1030. svc.Spec.Type = v1.ServiceTypeNodePort
  1031. svc.Spec.ClusterIP = "172.16.55.10"
  1032. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0)
  1033. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0)
  1034. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "muchmoreblah", "SCTP", 343, 676, 0)
  1035. }),
  1036. makeTestService("somewhere", "load-balancer", func(svc *v1.Service) {
  1037. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1038. svc.Spec.ClusterIP = "172.16.55.11"
  1039. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1040. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
  1041. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
  1042. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1043. Ingress: []v1.LoadBalancerIngress{
  1044. {IP: "10.1.2.4"},
  1045. },
  1046. }
  1047. }),
  1048. makeTestService("somewhere", "only-local-load-balancer", func(svc *v1.Service) {
  1049. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1050. svc.Spec.ClusterIP = "172.16.55.12"
  1051. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1052. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
  1053. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
  1054. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1055. Ingress: []v1.LoadBalancerIngress{
  1056. {IP: "10.1.2.3"},
  1057. },
  1058. }
  1059. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  1060. svc.Spec.HealthCheckNodePort = 345
  1061. }),
  1062. }
  1063. for i := range services {
  1064. fp.OnServiceAdd(services[i])
  1065. }
  1066. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1067. if len(fp.serviceMap) != 10 {
  1068. t.Errorf("expected service map length 10, got %v", fp.serviceMap)
  1069. }
  1070. // The only-local-loadbalancer ones get added
  1071. if len(result.HCServiceNodePorts) != 1 {
  1072. t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
  1073. } else {
  1074. nsn := makeNSN("somewhere", "only-local-load-balancer")
  1075. if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
  1076. t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
  1077. }
  1078. }
  1079. if len(result.UDPStaleClusterIP) != 0 {
  1080. // Services only added, so nothing stale yet
  1081. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1082. }
  1083. // Remove some stuff
  1084. // oneService is a modification of services[0] with removed first port.
  1085. oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
  1086. svc.Spec.Type = v1.ServiceTypeClusterIP
  1087. svc.Spec.ClusterIP = "172.16.55.4"
  1088. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
  1089. })
  1090. fp.OnServiceUpdate(services[0], oneService)
  1091. fp.OnServiceDelete(services[1])
  1092. fp.OnServiceDelete(services[2])
  1093. fp.OnServiceDelete(services[3])
  1094. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1095. if len(fp.serviceMap) != 1 {
  1096. t.Errorf("expected service map length 1, got %v", fp.serviceMap)
  1097. }
  1098. if len(result.HCServiceNodePorts) != 0 {
  1099. t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
  1100. }
  1101. // All services but one were deleted. While you'd expect only the ClusterIPs
  1102. // from the three deleted services here, we still have the ClusterIP for
  1103. // the not-deleted service, because one of it's ServicePorts was deleted.
  1104. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
  1105. if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
  1106. t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList())
  1107. }
  1108. for _, ip := range expectedStaleUDPServices {
  1109. if !result.UDPStaleClusterIP.Has(ip) {
  1110. t.Errorf("expected stale UDP service service %s", ip)
  1111. }
  1112. }
  1113. }
  1114. func TestBuildServiceMapServiceHeadless(t *testing.T) {
  1115. ipt := iptablestest.NewFake()
  1116. fp := NewFakeProxier(ipt, false)
  1117. makeServiceMap(fp,
  1118. makeTestService("somewhere-else", "headless", func(svc *v1.Service) {
  1119. svc.Spec.Type = v1.ServiceTypeClusterIP
  1120. svc.Spec.ClusterIP = v1.ClusterIPNone
  1121. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
  1122. }),
  1123. makeTestService("somewhere-else", "headless-without-port", func(svc *v1.Service) {
  1124. svc.Spec.Type = v1.ServiceTypeClusterIP
  1125. svc.Spec.ClusterIP = v1.ClusterIPNone
  1126. }),
  1127. )
  1128. // Headless service should be ignored
  1129. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1130. if len(fp.serviceMap) != 0 {
  1131. t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
  1132. }
  1133. // No proxied services, so no healthchecks
  1134. if len(result.HCServiceNodePorts) != 0 {
  1135. t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
  1136. }
  1137. if len(result.UDPStaleClusterIP) != 0 {
  1138. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1139. }
  1140. }
  1141. func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
  1142. ipt := iptablestest.NewFake()
  1143. fp := NewFakeProxier(ipt, false)
  1144. makeServiceMap(fp,
  1145. makeTestService("somewhere-else", "external-name", func(svc *v1.Service) {
  1146. svc.Spec.Type = v1.ServiceTypeExternalName
  1147. svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
  1148. svc.Spec.ExternalName = "foo2.bar.com"
  1149. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
  1150. }),
  1151. )
  1152. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1153. if len(fp.serviceMap) != 0 {
  1154. t.Errorf("expected service map length 0, got %v", fp.serviceMap)
  1155. }
  1156. // No proxied services, so no healthchecks
  1157. if len(result.HCServiceNodePorts) != 0 {
  1158. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1159. }
  1160. if len(result.UDPStaleClusterIP) != 0 {
  1161. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
  1162. }
  1163. }
  1164. func TestBuildServiceMapServiceUpdate(t *testing.T) {
  1165. ipt := iptablestest.NewFake()
  1166. fp := NewFakeProxier(ipt, false)
  1167. servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
  1168. svc.Spec.Type = v1.ServiceTypeClusterIP
  1169. svc.Spec.ClusterIP = "172.16.55.4"
  1170. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
  1171. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
  1172. })
  1173. servicev2 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
  1174. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1175. svc.Spec.ClusterIP = "172.16.55.4"
  1176. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1177. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
  1178. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
  1179. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1180. Ingress: []v1.LoadBalancerIngress{
  1181. {IP: "10.1.2.3"},
  1182. },
  1183. }
  1184. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  1185. svc.Spec.HealthCheckNodePort = 345
  1186. })
  1187. fp.OnServiceAdd(servicev1)
  1188. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1189. if len(fp.serviceMap) != 2 {
  1190. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1191. }
  1192. if len(result.HCServiceNodePorts) != 0 {
  1193. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1194. }
  1195. if len(result.UDPStaleClusterIP) != 0 {
  1196. // Services only added, so nothing stale yet
  1197. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1198. }
  1199. // Change service to load-balancer
  1200. fp.OnServiceUpdate(servicev1, servicev2)
  1201. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1202. if len(fp.serviceMap) != 2 {
  1203. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1204. }
  1205. if len(result.HCServiceNodePorts) != 1 {
  1206. t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
  1207. }
  1208. if len(result.UDPStaleClusterIP) != 0 {
  1209. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
  1210. }
  1211. // No change; make sure the service map stays the same and there are
  1212. // no health-check changes
  1213. fp.OnServiceUpdate(servicev2, servicev2)
  1214. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1215. if len(fp.serviceMap) != 2 {
  1216. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1217. }
  1218. if len(result.HCServiceNodePorts) != 1 {
  1219. t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
  1220. }
  1221. if len(result.UDPStaleClusterIP) != 0 {
  1222. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
  1223. }
  1224. // And back to ClusterIP
  1225. fp.OnServiceUpdate(servicev2, servicev1)
  1226. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1227. if len(fp.serviceMap) != 2 {
  1228. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1229. }
  1230. if len(result.HCServiceNodePorts) != 0 {
  1231. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1232. }
  1233. if len(result.UDPStaleClusterIP) != 0 {
  1234. // Services only added, so nothing stale yet
  1235. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1236. }
  1237. }
  1238. func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
  1239. ept := &v1.Endpoints{
  1240. ObjectMeta: metav1.ObjectMeta{
  1241. Name: name,
  1242. Namespace: namespace,
  1243. },
  1244. }
  1245. eptFunc(ept)
  1246. return ept
  1247. }
  1248. func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
  1249. for i := range allEndpoints {
  1250. proxier.OnEndpointsAdd(allEndpoints[i])
  1251. }
  1252. proxier.mu.Lock()
  1253. defer proxier.mu.Unlock()
  1254. proxier.endpointsSynced = true
  1255. }
  1256. func makeNSN(namespace, name string) types.NamespacedName {
  1257. return types.NamespacedName{Namespace: namespace, Name: name}
  1258. }
  1259. func makeServicePortName(ns, name, port string, protocol v1.Protocol) proxy.ServicePortName {
  1260. return proxy.ServicePortName{
  1261. NamespacedName: makeNSN(ns, name),
  1262. Port: port,
  1263. Protocol: protocol,
  1264. }
  1265. }
  1266. func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
  1267. for i := range allServices {
  1268. proxier.OnServiceAdd(allServices[i])
  1269. }
  1270. proxier.mu.Lock()
  1271. defer proxier.mu.Unlock()
  1272. proxier.servicesSynced = true
  1273. }
  1274. func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
  1275. if len(newMap) != len(expected) {
  1276. t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
  1277. }
  1278. for x := range expected {
  1279. if len(newMap[x]) != len(expected[x]) {
  1280. t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
  1281. } else {
  1282. for i := range expected[x] {
  1283. newEp, ok := newMap[x][i].(*endpointsInfo)
  1284. if !ok {
  1285. t.Errorf("Failed to cast endpointsInfo")
  1286. continue
  1287. }
  1288. if newEp.Endpoint != expected[x][i].Endpoint ||
  1289. newEp.IsLocal != expected[x][i].IsLocal ||
  1290. newEp.protocol != expected[x][i].protocol ||
  1291. newEp.chainName != expected[x][i].chainName {
  1292. t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
  1293. }
  1294. }
  1295. }
  1296. }
  1297. }
  1298. func Test_updateEndpointsMap(t *testing.T) {
  1299. var nodeName = testHostname
  1300. emptyEndpoint := func(ept *v1.Endpoints) {
  1301. ept.Subsets = []v1.EndpointSubset{}
  1302. }
  1303. unnamedPort := func(ept *v1.Endpoints) {
  1304. ept.Subsets = []v1.EndpointSubset{{
  1305. Addresses: []v1.EndpointAddress{{
  1306. IP: "1.1.1.1",
  1307. }},
  1308. Ports: []v1.EndpointPort{{
  1309. Port: 11,
  1310. Protocol: v1.ProtocolUDP,
  1311. }},
  1312. }}
  1313. }
  1314. unnamedPortLocal := func(ept *v1.Endpoints) {
  1315. ept.Subsets = []v1.EndpointSubset{{
  1316. Addresses: []v1.EndpointAddress{{
  1317. IP: "1.1.1.1",
  1318. NodeName: &nodeName,
  1319. }},
  1320. Ports: []v1.EndpointPort{{
  1321. Port: 11,
  1322. Protocol: v1.ProtocolUDP,
  1323. }},
  1324. }}
  1325. }
  1326. namedPortLocal := func(ept *v1.Endpoints) {
  1327. ept.Subsets = []v1.EndpointSubset{{
  1328. Addresses: []v1.EndpointAddress{{
  1329. IP: "1.1.1.1",
  1330. NodeName: &nodeName,
  1331. }},
  1332. Ports: []v1.EndpointPort{{
  1333. Name: "p11",
  1334. Port: 11,
  1335. Protocol: v1.ProtocolUDP,
  1336. }},
  1337. }}
  1338. }
  1339. namedPort := func(ept *v1.Endpoints) {
  1340. ept.Subsets = []v1.EndpointSubset{{
  1341. Addresses: []v1.EndpointAddress{{
  1342. IP: "1.1.1.1",
  1343. }},
  1344. Ports: []v1.EndpointPort{{
  1345. Name: "p11",
  1346. Port: 11,
  1347. Protocol: v1.ProtocolUDP,
  1348. }},
  1349. }}
  1350. }
  1351. namedPortRenamed := func(ept *v1.Endpoints) {
  1352. ept.Subsets = []v1.EndpointSubset{{
  1353. Addresses: []v1.EndpointAddress{{
  1354. IP: "1.1.1.1",
  1355. }},
  1356. Ports: []v1.EndpointPort{{
  1357. Name: "p11-2",
  1358. Port: 11,
  1359. Protocol: v1.ProtocolUDP,
  1360. }},
  1361. }}
  1362. }
  1363. namedPortRenumbered := func(ept *v1.Endpoints) {
  1364. ept.Subsets = []v1.EndpointSubset{{
  1365. Addresses: []v1.EndpointAddress{{
  1366. IP: "1.1.1.1",
  1367. }},
  1368. Ports: []v1.EndpointPort{{
  1369. Name: "p11",
  1370. Port: 22,
  1371. Protocol: v1.ProtocolUDP,
  1372. }},
  1373. }}
  1374. }
  1375. namedPortsLocalNoLocal := func(ept *v1.Endpoints) {
  1376. ept.Subsets = []v1.EndpointSubset{{
  1377. Addresses: []v1.EndpointAddress{{
  1378. IP: "1.1.1.1",
  1379. }, {
  1380. IP: "1.1.1.2",
  1381. NodeName: &nodeName,
  1382. }},
  1383. Ports: []v1.EndpointPort{{
  1384. Name: "p11",
  1385. Port: 11,
  1386. Protocol: v1.ProtocolUDP,
  1387. }, {
  1388. Name: "p12",
  1389. Port: 12,
  1390. Protocol: v1.ProtocolUDP,
  1391. }},
  1392. }}
  1393. }
  1394. multipleSubsets := func(ept *v1.Endpoints) {
  1395. ept.Subsets = []v1.EndpointSubset{{
  1396. Addresses: []v1.EndpointAddress{{
  1397. IP: "1.1.1.1",
  1398. }},
  1399. Ports: []v1.EndpointPort{{
  1400. Name: "p11",
  1401. Port: 11,
  1402. Protocol: v1.ProtocolUDP,
  1403. }},
  1404. }, {
  1405. Addresses: []v1.EndpointAddress{{
  1406. IP: "1.1.1.2",
  1407. }},
  1408. Ports: []v1.EndpointPort{{
  1409. Name: "p12",
  1410. Port: 12,
  1411. Protocol: v1.ProtocolUDP,
  1412. }},
  1413. }}
  1414. }
  1415. multipleSubsetsWithLocal := func(ept *v1.Endpoints) {
  1416. ept.Subsets = []v1.EndpointSubset{{
  1417. Addresses: []v1.EndpointAddress{{
  1418. IP: "1.1.1.1",
  1419. }},
  1420. Ports: []v1.EndpointPort{{
  1421. Name: "p11",
  1422. Port: 11,
  1423. Protocol: v1.ProtocolUDP,
  1424. }},
  1425. }, {
  1426. Addresses: []v1.EndpointAddress{{
  1427. IP: "1.1.1.2",
  1428. NodeName: &nodeName,
  1429. }},
  1430. Ports: []v1.EndpointPort{{
  1431. Name: "p12",
  1432. Port: 12,
  1433. Protocol: v1.ProtocolUDP,
  1434. }},
  1435. }}
  1436. }
  1437. multipleSubsetsMultiplePortsLocal := func(ept *v1.Endpoints) {
  1438. ept.Subsets = []v1.EndpointSubset{{
  1439. Addresses: []v1.EndpointAddress{{
  1440. IP: "1.1.1.1",
  1441. NodeName: &nodeName,
  1442. }},
  1443. Ports: []v1.EndpointPort{{
  1444. Name: "p11",
  1445. Port: 11,
  1446. Protocol: v1.ProtocolUDP,
  1447. }, {
  1448. Name: "p12",
  1449. Port: 12,
  1450. Protocol: v1.ProtocolUDP,
  1451. }},
  1452. }, {
  1453. Addresses: []v1.EndpointAddress{{
  1454. IP: "1.1.1.3",
  1455. }},
  1456. Ports: []v1.EndpointPort{{
  1457. Name: "p13",
  1458. Port: 13,
  1459. Protocol: v1.ProtocolUDP,
  1460. }},
  1461. }}
  1462. }
  1463. multipleSubsetsIPsPorts1 := func(ept *v1.Endpoints) {
  1464. ept.Subsets = []v1.EndpointSubset{{
  1465. Addresses: []v1.EndpointAddress{{
  1466. IP: "1.1.1.1",
  1467. }, {
  1468. IP: "1.1.1.2",
  1469. NodeName: &nodeName,
  1470. }},
  1471. Ports: []v1.EndpointPort{{
  1472. Name: "p11",
  1473. Port: 11,
  1474. Protocol: v1.ProtocolUDP,
  1475. }, {
  1476. Name: "p12",
  1477. Port: 12,
  1478. Protocol: v1.ProtocolUDP,
  1479. }},
  1480. }, {
  1481. Addresses: []v1.EndpointAddress{{
  1482. IP: "1.1.1.3",
  1483. }, {
  1484. IP: "1.1.1.4",
  1485. NodeName: &nodeName,
  1486. }},
  1487. Ports: []v1.EndpointPort{{
  1488. Name: "p13",
  1489. Port: 13,
  1490. Protocol: v1.ProtocolUDP,
  1491. }, {
  1492. Name: "p14",
  1493. Port: 14,
  1494. Protocol: v1.ProtocolUDP,
  1495. }},
  1496. }}
  1497. }
  1498. multipleSubsetsIPsPorts2 := func(ept *v1.Endpoints) {
  1499. ept.Subsets = []v1.EndpointSubset{{
  1500. Addresses: []v1.EndpointAddress{{
  1501. IP: "2.2.2.1",
  1502. }, {
  1503. IP: "2.2.2.2",
  1504. NodeName: &nodeName,
  1505. }},
  1506. Ports: []v1.EndpointPort{{
  1507. Name: "p21",
  1508. Port: 21,
  1509. Protocol: v1.ProtocolUDP,
  1510. }, {
  1511. Name: "p22",
  1512. Port: 22,
  1513. Protocol: v1.ProtocolUDP,
  1514. }},
  1515. }}
  1516. }
  1517. complexBefore1 := func(ept *v1.Endpoints) {
  1518. ept.Subsets = []v1.EndpointSubset{{
  1519. Addresses: []v1.EndpointAddress{{
  1520. IP: "1.1.1.1",
  1521. }},
  1522. Ports: []v1.EndpointPort{{
  1523. Name: "p11",
  1524. Port: 11,
  1525. Protocol: v1.ProtocolUDP,
  1526. }},
  1527. }}
  1528. }
  1529. complexBefore2 := func(ept *v1.Endpoints) {
  1530. ept.Subsets = []v1.EndpointSubset{{
  1531. Addresses: []v1.EndpointAddress{{
  1532. IP: "2.2.2.2",
  1533. NodeName: &nodeName,
  1534. }, {
  1535. IP: "2.2.2.22",
  1536. NodeName: &nodeName,
  1537. }},
  1538. Ports: []v1.EndpointPort{{
  1539. Name: "p22",
  1540. Port: 22,
  1541. Protocol: v1.ProtocolUDP,
  1542. }},
  1543. }, {
  1544. Addresses: []v1.EndpointAddress{{
  1545. IP: "2.2.2.3",
  1546. NodeName: &nodeName,
  1547. }},
  1548. Ports: []v1.EndpointPort{{
  1549. Name: "p23",
  1550. Port: 23,
  1551. Protocol: v1.ProtocolUDP,
  1552. }},
  1553. }}
  1554. }
  1555. complexBefore4 := func(ept *v1.Endpoints) {
  1556. ept.Subsets = []v1.EndpointSubset{{
  1557. Addresses: []v1.EndpointAddress{{
  1558. IP: "4.4.4.4",
  1559. NodeName: &nodeName,
  1560. }, {
  1561. IP: "4.4.4.5",
  1562. NodeName: &nodeName,
  1563. }},
  1564. Ports: []v1.EndpointPort{{
  1565. Name: "p44",
  1566. Port: 44,
  1567. Protocol: v1.ProtocolUDP,
  1568. }},
  1569. }, {
  1570. Addresses: []v1.EndpointAddress{{
  1571. IP: "4.4.4.6",
  1572. NodeName: &nodeName,
  1573. }},
  1574. Ports: []v1.EndpointPort{{
  1575. Name: "p45",
  1576. Port: 45,
  1577. Protocol: v1.ProtocolUDP,
  1578. }},
  1579. }}
  1580. }
  1581. complexAfter1 := func(ept *v1.Endpoints) {
  1582. ept.Subsets = []v1.EndpointSubset{{
  1583. Addresses: []v1.EndpointAddress{{
  1584. IP: "1.1.1.1",
  1585. }, {
  1586. IP: "1.1.1.11",
  1587. }},
  1588. Ports: []v1.EndpointPort{{
  1589. Name: "p11",
  1590. Port: 11,
  1591. Protocol: v1.ProtocolUDP,
  1592. }},
  1593. }, {
  1594. Addresses: []v1.EndpointAddress{{
  1595. IP: "1.1.1.2",
  1596. }},
  1597. Ports: []v1.EndpointPort{{
  1598. Name: "p12",
  1599. Port: 12,
  1600. Protocol: v1.ProtocolUDP,
  1601. }, {
  1602. Name: "p122",
  1603. Port: 122,
  1604. Protocol: v1.ProtocolUDP,
  1605. }},
  1606. }}
  1607. }
  1608. complexAfter3 := func(ept *v1.Endpoints) {
  1609. ept.Subsets = []v1.EndpointSubset{{
  1610. Addresses: []v1.EndpointAddress{{
  1611. IP: "3.3.3.3",
  1612. }},
  1613. Ports: []v1.EndpointPort{{
  1614. Name: "p33",
  1615. Port: 33,
  1616. Protocol: v1.ProtocolUDP,
  1617. }},
  1618. }}
  1619. }
  1620. complexAfter4 := func(ept *v1.Endpoints) {
  1621. ept.Subsets = []v1.EndpointSubset{{
  1622. Addresses: []v1.EndpointAddress{{
  1623. IP: "4.4.4.4",
  1624. NodeName: &nodeName,
  1625. }},
  1626. Ports: []v1.EndpointPort{{
  1627. Name: "p44",
  1628. Port: 44,
  1629. Protocol: v1.ProtocolUDP,
  1630. }},
  1631. }}
  1632. }
  1633. testCases := []struct {
  1634. // previousEndpoints and currentEndpoints are used to call appropriate
  1635. // handlers OnEndpoints* (based on whether corresponding values are nil
  1636. // or non-nil) and must be of equal length.
  1637. previousEndpoints []*v1.Endpoints
  1638. currentEndpoints []*v1.Endpoints
  1639. oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
  1640. expectedResult map[proxy.ServicePortName][]*endpointsInfo
  1641. expectedStaleEndpoints []proxy.ServiceEndpoint
  1642. expectedStaleServiceNames map[proxy.ServicePortName]bool
  1643. expectedHealthchecks map[types.NamespacedName]int
  1644. }{{
  1645. // Case[0]: nothing
  1646. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
  1647. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
  1648. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1649. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1650. expectedHealthchecks: map[types.NamespacedName]int{},
  1651. }, {
  1652. // Case[1]: no change, unnamed port
  1653. previousEndpoints: []*v1.Endpoints{
  1654. makeTestEndpoints("ns1", "ep1", unnamedPort),
  1655. },
  1656. currentEndpoints: []*v1.Endpoints{
  1657. makeTestEndpoints("ns1", "ep1", unnamedPort),
  1658. },
  1659. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1660. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  1661. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1662. },
  1663. },
  1664. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1665. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  1666. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1667. },
  1668. },
  1669. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1670. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1671. expectedHealthchecks: map[types.NamespacedName]int{},
  1672. }, {
  1673. // Case[2]: no change, named port, local
  1674. previousEndpoints: []*v1.Endpoints{
  1675. makeTestEndpoints("ns1", "ep1", namedPortLocal),
  1676. },
  1677. currentEndpoints: []*v1.Endpoints{
  1678. makeTestEndpoints("ns1", "ep1", namedPortLocal),
  1679. },
  1680. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1681. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1682. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1683. },
  1684. },
  1685. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1686. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1687. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1688. },
  1689. },
  1690. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1691. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1692. expectedHealthchecks: map[types.NamespacedName]int{
  1693. makeNSN("ns1", "ep1"): 1,
  1694. },
  1695. }, {
  1696. // Case[3]: no change, multiple subsets
  1697. previousEndpoints: []*v1.Endpoints{
  1698. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  1699. },
  1700. currentEndpoints: []*v1.Endpoints{
  1701. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  1702. },
  1703. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1704. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1705. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1706. },
  1707. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1708. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  1709. },
  1710. },
  1711. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1712. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1713. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1714. },
  1715. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1716. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  1717. },
  1718. },
  1719. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1720. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1721. expectedHealthchecks: map[types.NamespacedName]int{},
  1722. }, {
  1723. // Case[4]: no change, multiple subsets, multiple ports, local
  1724. previousEndpoints: []*v1.Endpoints{
  1725. makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
  1726. },
  1727. currentEndpoints: []*v1.Endpoints{
  1728. makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
  1729. },
  1730. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1731. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1732. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1733. },
  1734. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1735. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: true}},
  1736. },
  1737. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  1738. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1739. },
  1740. },
  1741. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1742. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1743. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1744. },
  1745. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1746. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: true}},
  1747. },
  1748. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  1749. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1750. },
  1751. },
  1752. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1753. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1754. expectedHealthchecks: map[types.NamespacedName]int{
  1755. makeNSN("ns1", "ep1"): 1,
  1756. },
  1757. }, {
  1758. // Case[5]: no change, multiple endpoints, subsets, IPs, and ports
  1759. previousEndpoints: []*v1.Endpoints{
  1760. makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
  1761. makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
  1762. },
  1763. currentEndpoints: []*v1.Endpoints{
  1764. makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
  1765. makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
  1766. },
  1767. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1768. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1769. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1770. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1771. },
  1772. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1773. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1774. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1775. },
  1776. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  1777. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1778. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:13", IsLocal: true}},
  1779. },
  1780. makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): {
  1781. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:14", IsLocal: false}},
  1782. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:14", IsLocal: true}},
  1783. },
  1784. makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): {
  1785. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:21", IsLocal: false}},
  1786. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:21", IsLocal: true}},
  1787. },
  1788. makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
  1789. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:22", IsLocal: false}},
  1790. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
  1791. },
  1792. },
  1793. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1794. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1795. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1796. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1797. },
  1798. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1799. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1800. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1801. },
  1802. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  1803. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1804. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:13", IsLocal: true}},
  1805. },
  1806. makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): {
  1807. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:14", IsLocal: false}},
  1808. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:14", IsLocal: true}},
  1809. },
  1810. makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): {
  1811. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:21", IsLocal: false}},
  1812. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:21", IsLocal: true}},
  1813. },
  1814. makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
  1815. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:22", IsLocal: false}},
  1816. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
  1817. },
  1818. },
  1819. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1820. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1821. expectedHealthchecks: map[types.NamespacedName]int{
  1822. makeNSN("ns1", "ep1"): 2,
  1823. makeNSN("ns2", "ep2"): 1,
  1824. },
  1825. }, {
  1826. // Case[6]: add an Endpoints
  1827. previousEndpoints: []*v1.Endpoints{
  1828. nil,
  1829. },
  1830. currentEndpoints: []*v1.Endpoints{
  1831. makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
  1832. },
  1833. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
  1834. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1835. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  1836. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1837. },
  1838. },
  1839. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1840. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  1841. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
  1842. },
  1843. expectedHealthchecks: map[types.NamespacedName]int{
  1844. makeNSN("ns1", "ep1"): 1,
  1845. },
  1846. }, {
  1847. // Case[7]: remove an Endpoints
  1848. previousEndpoints: []*v1.Endpoints{
  1849. makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
  1850. },
  1851. currentEndpoints: []*v1.Endpoints{
  1852. nil,
  1853. },
  1854. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1855. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  1856. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1857. },
  1858. },
  1859. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
  1860. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1861. Endpoint: "1.1.1.1:11",
  1862. ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
  1863. }},
  1864. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1865. expectedHealthchecks: map[types.NamespacedName]int{},
  1866. }, {
  1867. // Case[8]: add an IP and port
  1868. previousEndpoints: []*v1.Endpoints{
  1869. makeTestEndpoints("ns1", "ep1", namedPort),
  1870. },
  1871. currentEndpoints: []*v1.Endpoints{
  1872. makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
  1873. },
  1874. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1875. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1876. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1877. },
  1878. },
  1879. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1880. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1881. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1882. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1883. },
  1884. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1885. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1886. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1887. },
  1888. },
  1889. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1890. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  1891. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
  1892. },
  1893. expectedHealthchecks: map[types.NamespacedName]int{
  1894. makeNSN("ns1", "ep1"): 1,
  1895. },
  1896. }, {
  1897. // Case[9]: remove an IP and port
  1898. previousEndpoints: []*v1.Endpoints{
  1899. makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
  1900. },
  1901. currentEndpoints: []*v1.Endpoints{
  1902. makeTestEndpoints("ns1", "ep1", namedPort),
  1903. },
  1904. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1905. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1906. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1907. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1908. },
  1909. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1910. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1911. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1912. },
  1913. },
  1914. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1915. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1916. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1917. },
  1918. },
  1919. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1920. Endpoint: "1.1.1.2:11",
  1921. ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
  1922. }, {
  1923. Endpoint: "1.1.1.1:12",
  1924. ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
  1925. }, {
  1926. Endpoint: "1.1.1.2:12",
  1927. ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
  1928. }},
  1929. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1930. expectedHealthchecks: map[types.NamespacedName]int{},
  1931. }, {
  1932. // Case[10]: add a subset
  1933. previousEndpoints: []*v1.Endpoints{
  1934. makeTestEndpoints("ns1", "ep1", namedPort),
  1935. },
  1936. currentEndpoints: []*v1.Endpoints{
  1937. makeTestEndpoints("ns1", "ep1", multipleSubsetsWithLocal),
  1938. },
  1939. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1940. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1941. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1942. },
  1943. },
  1944. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1945. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1946. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1947. },
  1948. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1949. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1950. },
  1951. },
  1952. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1953. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  1954. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
  1955. },
  1956. expectedHealthchecks: map[types.NamespacedName]int{
  1957. makeNSN("ns1", "ep1"): 1,
  1958. },
  1959. }, {
  1960. // Case[11]: remove a subset
  1961. previousEndpoints: []*v1.Endpoints{
  1962. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  1963. },
  1964. currentEndpoints: []*v1.Endpoints{
  1965. makeTestEndpoints("ns1", "ep1", namedPort),
  1966. },
  1967. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1968. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1969. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1970. },
  1971. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1972. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  1973. },
  1974. },
  1975. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1976. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1977. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1978. },
  1979. },
  1980. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1981. Endpoint: "1.1.1.2:12",
  1982. ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
  1983. }},
  1984. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1985. expectedHealthchecks: map[types.NamespacedName]int{},
  1986. }, {
  1987. // Case[12]: rename a port
  1988. previousEndpoints: []*v1.Endpoints{
  1989. makeTestEndpoints("ns1", "ep1", namedPort),
  1990. },
  1991. currentEndpoints: []*v1.Endpoints{
  1992. makeTestEndpoints("ns1", "ep1", namedPortRenamed),
  1993. },
  1994. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1995. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1996. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1997. },
  1998. },
  1999. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  2000. makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): {
  2001. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2002. },
  2003. },
  2004. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2005. Endpoint: "1.1.1.1:11",
  2006. ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
  2007. }},
  2008. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2009. makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
  2010. },
  2011. expectedHealthchecks: map[types.NamespacedName]int{},
  2012. }, {
  2013. // Case[13]: renumber a port
  2014. previousEndpoints: []*v1.Endpoints{
  2015. makeTestEndpoints("ns1", "ep1", namedPort),
  2016. },
  2017. currentEndpoints: []*v1.Endpoints{
  2018. makeTestEndpoints("ns1", "ep1", namedPortRenumbered),
  2019. },
  2020. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  2021. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2022. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2023. },
  2024. },
  2025. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  2026. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2027. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:22", IsLocal: false}},
  2028. },
  2029. },
  2030. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2031. Endpoint: "1.1.1.1:11",
  2032. ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
  2033. }},
  2034. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2035. expectedHealthchecks: map[types.NamespacedName]int{},
  2036. }, {
  2037. // Case[14]: complex add and remove
  2038. previousEndpoints: []*v1.Endpoints{
  2039. makeTestEndpoints("ns1", "ep1", complexBefore1),
  2040. makeTestEndpoints("ns2", "ep2", complexBefore2),
  2041. nil,
  2042. makeTestEndpoints("ns4", "ep4", complexBefore4),
  2043. },
  2044. currentEndpoints: []*v1.Endpoints{
  2045. makeTestEndpoints("ns1", "ep1", complexAfter1),
  2046. nil,
  2047. makeTestEndpoints("ns3", "ep3", complexAfter3),
  2048. makeTestEndpoints("ns4", "ep4", complexAfter4),
  2049. },
  2050. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  2051. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2052. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2053. },
  2054. makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
  2055. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
  2056. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.22:22", IsLocal: true}},
  2057. },
  2058. makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP): {
  2059. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.3:23", IsLocal: true}},
  2060. },
  2061. makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): {
  2062. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.4:44", IsLocal: true}},
  2063. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.5:44", IsLocal: true}},
  2064. },
  2065. makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP): {
  2066. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.6:45", IsLocal: true}},
  2067. },
  2068. },
  2069. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  2070. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2071. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2072. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.11:11", IsLocal: false}},
  2073. },
  2074. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2075. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  2076. },
  2077. makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): {
  2078. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:122", IsLocal: false}},
  2079. },
  2080. makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): {
  2081. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "3.3.3.3:33", IsLocal: false}},
  2082. },
  2083. makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): {
  2084. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.4:44", IsLocal: true}},
  2085. },
  2086. },
  2087. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2088. Endpoint: "2.2.2.2:22",
  2089. ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
  2090. }, {
  2091. Endpoint: "2.2.2.22:22",
  2092. ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
  2093. }, {
  2094. Endpoint: "2.2.2.3:23",
  2095. ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP),
  2096. }, {
  2097. Endpoint: "4.4.4.5:44",
  2098. ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP),
  2099. }, {
  2100. Endpoint: "4.4.4.6:45",
  2101. ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
  2102. }},
  2103. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2104. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
  2105. makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
  2106. makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
  2107. },
  2108. expectedHealthchecks: map[types.NamespacedName]int{
  2109. makeNSN("ns4", "ep4"): 1,
  2110. },
  2111. }, {
  2112. // Case[15]: change from 0 endpoint address to 1 unnamed port
  2113. previousEndpoints: []*v1.Endpoints{
  2114. makeTestEndpoints("ns1", "ep1", emptyEndpoint),
  2115. },
  2116. currentEndpoints: []*v1.Endpoints{
  2117. makeTestEndpoints("ns1", "ep1", unnamedPort),
  2118. },
  2119. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
  2120. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  2121. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  2122. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2123. },
  2124. },
  2125. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2126. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2127. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
  2128. },
  2129. expectedHealthchecks: map[types.NamespacedName]int{},
  2130. },
  2131. }
  2132. for tci, tc := range testCases {
  2133. ipt := iptablestest.NewFake()
  2134. fp := NewFakeProxier(ipt, false)
  2135. fp.hostname = nodeName
  2136. // First check that after adding all previous versions of endpoints,
  2137. // the fp.oldEndpoints is as we expect.
  2138. for i := range tc.previousEndpoints {
  2139. if tc.previousEndpoints[i] != nil {
  2140. fp.OnEndpointsAdd(tc.previousEndpoints[i])
  2141. }
  2142. }
  2143. fp.endpointsMap.Update(fp.endpointsChanges)
  2144. compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
  2145. // Now let's call appropriate handlers to get to state we want to be.
  2146. if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
  2147. t.Fatalf("[%d] different lengths of previous and current endpoints", tci)
  2148. continue
  2149. }
  2150. for i := range tc.previousEndpoints {
  2151. prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i]
  2152. switch {
  2153. case prev == nil:
  2154. fp.OnEndpointsAdd(curr)
  2155. case curr == nil:
  2156. fp.OnEndpointsDelete(prev)
  2157. default:
  2158. fp.OnEndpointsUpdate(prev, curr)
  2159. }
  2160. }
  2161. result := fp.endpointsMap.Update(fp.endpointsChanges)
  2162. newMap := fp.endpointsMap
  2163. compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
  2164. if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
  2165. t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
  2166. }
  2167. for _, x := range tc.expectedStaleEndpoints {
  2168. found := false
  2169. for _, stale := range result.StaleEndpoints {
  2170. if stale == x {
  2171. found = true
  2172. break
  2173. }
  2174. }
  2175. if !found {
  2176. t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints)
  2177. }
  2178. }
  2179. if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) {
  2180. t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames)
  2181. }
  2182. for svcName := range tc.expectedStaleServiceNames {
  2183. found := false
  2184. for _, stale := range result.StaleServiceNames {
  2185. if stale == svcName {
  2186. found = true
  2187. }
  2188. }
  2189. if !found {
  2190. t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
  2191. }
  2192. }
  2193. if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
  2194. t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
  2195. }
  2196. }
  2197. }
  2198. // The majority of EndpointSlice specific tests are not iptables specific and focus on
  2199. // the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the
  2200. // iptables proxier supports translating EndpointSlices to iptables output.
  2201. func TestEndpointSliceE2E(t *testing.T) {
  2202. expectedIPTablesWithSlice := `*filter
  2203. :KUBE-SERVICES - [0:0]
  2204. :KUBE-EXTERNAL-SERVICES - [0:0]
  2205. :KUBE-FORWARD - [0:0]
  2206. -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
  2207. -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark -j ACCEPT
  2208. -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
  2209. -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
  2210. COMMIT
  2211. *nat
  2212. :KUBE-SERVICES - [0:0]
  2213. :KUBE-NODEPORTS - [0:0]
  2214. :KUBE-POSTROUTING - [0:0]
  2215. :KUBE-MARK-MASQ - [0:0]
  2216. :KUBE-SVC-AHZNAGK3SCETOS2T - [0:0]
  2217. :KUBE-SEP-PXD6POUVGD2I37UY - [0:0]
  2218. :KUBE-SEP-SOKZUIT7SCEVIP33 - [0:0]
  2219. :KUBE-SEP-WVE3FAB34S7NZGDJ - [0:0]
  2220. -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark -j MASQUERADE
  2221. -A KUBE-MARK-MASQ -j MARK --set-xmark
  2222. -A KUBE-SERVICES -m comment --comment "ns1/svc1: cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
  2223. -A KUBE-SERVICES -m comment --comment "ns1/svc1: cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AHZNAGK3SCETOS2T
  2224. -A KUBE-SVC-AHZNAGK3SCETOS2T -m comment --comment ns1/svc1: -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-PXD6POUVGD2I37UY
  2225. -A KUBE-SEP-PXD6POUVGD2I37UY -m comment --comment ns1/svc1: -s 10.0.1.1/32 -j KUBE-MARK-MASQ
  2226. -A KUBE-SEP-PXD6POUVGD2I37UY -m comment --comment ns1/svc1: -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
  2227. -A KUBE-SVC-AHZNAGK3SCETOS2T -m comment --comment ns1/svc1: -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-SOKZUIT7SCEVIP33
  2228. -A KUBE-SEP-SOKZUIT7SCEVIP33 -m comment --comment ns1/svc1: -s 10.0.1.2/32 -j KUBE-MARK-MASQ
  2229. -A KUBE-SEP-SOKZUIT7SCEVIP33 -m comment --comment ns1/svc1: -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
  2230. -A KUBE-SVC-AHZNAGK3SCETOS2T -m comment --comment ns1/svc1: -j KUBE-SEP-WVE3FAB34S7NZGDJ
  2231. -A KUBE-SEP-WVE3FAB34S7NZGDJ -m comment --comment ns1/svc1: -s 10.0.1.3/32 -j KUBE-MARK-MASQ
  2232. -A KUBE-SEP-WVE3FAB34S7NZGDJ -m comment --comment ns1/svc1: -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
  2233. -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
  2234. COMMIT
  2235. `
  2236. ipt := iptablestest.NewFake()
  2237. fp := NewFakeProxier(ipt, true)
  2238. fp.OnServiceSynced()
  2239. fp.OnEndpointsSynced()
  2240. fp.OnEndpointSlicesSynced()
  2241. serviceName := "svc1"
  2242. namespaceName := "ns1"
  2243. fp.OnServiceAdd(&v1.Service{
  2244. ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
  2245. Spec: v1.ServiceSpec{
  2246. ClusterIP: "172.20.1.1",
  2247. Selector: map[string]string{"foo": "bar"},
  2248. Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}},
  2249. },
  2250. })
  2251. tcpProtocol := v1.ProtocolTCP
  2252. endpointSlice := &discovery.EndpointSlice{
  2253. ObjectMeta: metav1.ObjectMeta{
  2254. Name: fmt.Sprintf("%s-1", serviceName),
  2255. Namespace: namespaceName,
  2256. Labels: map[string]string{discovery.LabelServiceName: serviceName},
  2257. },
  2258. Ports: []discovery.EndpointPort{{
  2259. Name: utilpointer.StringPtr(""),
  2260. Port: utilpointer.Int32Ptr(80),
  2261. Protocol: &tcpProtocol,
  2262. }},
  2263. AddressType: discovery.AddressTypeIPv4,
  2264. Endpoints: []discovery.Endpoint{{
  2265. Addresses: []string{"10.0.1.1"},
  2266. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  2267. Topology: map[string]string{"kubernetes.io/hostname": testHostname},
  2268. }, {
  2269. Addresses: []string{"10.0.1.2"},
  2270. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  2271. Topology: map[string]string{"kubernetes.io/hostname": "node2"},
  2272. }, {
  2273. Addresses: []string{"10.0.1.3"},
  2274. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  2275. Topology: map[string]string{"kubernetes.io/hostname": "node3"},
  2276. }},
  2277. }
  2278. fp.OnEndpointSliceAdd(endpointSlice)
  2279. fp.syncProxyRules()
  2280. assert.Equal(t, expectedIPTablesWithSlice, fp.iptablesData.String())
  2281. fp.OnEndpointSliceDelete(endpointSlice)
  2282. fp.syncProxyRules()
  2283. assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String())
  2284. }
  2285. // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.