proxier_test.go 83 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package iptables
  14. import (
  15. "bytes"
  16. "fmt"
  17. "net"
  18. "reflect"
  19. "strconv"
  20. "strings"
  21. "testing"
  22. "time"
  23. "k8s.io/klog"
  24. "github.com/stretchr/testify/assert"
  25. v1 "k8s.io/api/core/v1"
  26. discovery "k8s.io/api/discovery/v1beta1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/types"
  29. "k8s.io/apimachinery/pkg/util/intstr"
  30. "k8s.io/kubernetes/pkg/proxy"
  31. "k8s.io/kubernetes/pkg/proxy/healthcheck"
  32. utilproxy "k8s.io/kubernetes/pkg/proxy/util"
  33. utilproxytest "k8s.io/kubernetes/pkg/proxy/util/testing"
  34. "k8s.io/kubernetes/pkg/util/async"
  35. "k8s.io/kubernetes/pkg/util/conntrack"
  36. utiliptables "k8s.io/kubernetes/pkg/util/iptables"
  37. iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
  38. "k8s.io/utils/exec"
  39. fakeexec "k8s.io/utils/exec/testing"
  40. utilpointer "k8s.io/utils/pointer"
  41. )
  42. func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
  43. chainLines := utiliptables.GetChainLines(table, save)
  44. for chain, lineBytes := range chainLines {
  45. line := string(lineBytes)
  46. if expected, exists := expectedLines[chain]; exists {
  47. if expected != line {
  48. t.Errorf("getChainLines expected chain line not present. For chain: %s Expected: %s Got: %s", chain, expected, line)
  49. }
  50. } else {
  51. t.Errorf("getChainLines expected chain not present: %s", chain)
  52. }
  53. }
  54. }
  55. func TestGetChainLines(t *testing.T) {
  56. iptablesSave := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014
  57. *nat
  58. :PREROUTING ACCEPT [2136997:197881818]
  59. :POSTROUTING ACCEPT [4284525:258542680]
  60. :OUTPUT ACCEPT [5901660:357267963]
  61. -A PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
  62. COMMIT
  63. # Completed on Wed Oct 29 14:56:01 2014`
  64. expected := map[utiliptables.Chain]string{
  65. utiliptables.ChainPrerouting: ":PREROUTING ACCEPT [2136997:197881818]",
  66. utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [4284525:258542680]",
  67. utiliptables.ChainOutput: ":OUTPUT ACCEPT [5901660:357267963]",
  68. }
  69. checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
  70. }
  71. func TestGetChainLinesMultipleTables(t *testing.T) {
  72. iptablesSave := `# Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
  73. *nat
  74. :PREROUTING ACCEPT [2:138]
  75. :INPUT ACCEPT [0:0]
  76. :OUTPUT ACCEPT [0:0]
  77. :POSTROUTING ACCEPT [0:0]
  78. :DOCKER - [0:0]
  79. :KUBE-NODEPORT-CONTAINER - [0:0]
  80. :KUBE-NODEPORT-HOST - [0:0]
  81. :KUBE-PORTALS-CONTAINER - [0:0]
  82. :KUBE-PORTALS-HOST - [0:0]
  83. :KUBE-SVC-1111111111111111 - [0:0]
  84. :KUBE-SVC-2222222222222222 - [0:0]
  85. :KUBE-SVC-3333333333333333 - [0:0]
  86. :KUBE-SVC-4444444444444444 - [0:0]
  87. :KUBE-SVC-5555555555555555 - [0:0]
  88. :KUBE-SVC-6666666666666666 - [0:0]
  89. -A PREROUTING -m comment --comment "handle ClusterIPs; NOTE: this must be before the NodePort rules" -j KUBE-PORTALS-CONTAINER
  90. -A PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
  91. -A PREROUTING -m addrtype --dst-type LOCAL -m comment --comment "handle service NodePorts; NOTE: this must be the last rule in the chain" -j KUBE-NODEPORT-CONTAINER
  92. -A OUTPUT -m comment --comment "handle ClusterIPs; NOTE: this must be before the NodePort rules" -j KUBE-PORTALS-HOST
  93. -A OUTPUT ! -d 127.0.0.0/8 -m addrtype --dst-type LOCAL -j DOCKER
  94. -A OUTPUT -m addrtype --dst-type LOCAL -m comment --comment "handle service NodePorts; NOTE: this must be the last rule in the chain" -j KUBE-NODEPORT-HOST
  95. -A POSTROUTING -s 10.246.1.0/24 ! -o cbr0 -j MASQUERADE
  96. -A POSTROUTING -s 10.0.2.15/32 -d 10.0.2.15/32 -m comment --comment "handle pod connecting to self" -j MASQUERADE
  97. -A KUBE-PORTALS-CONTAINER -d 10.247.0.1/32 -p tcp -m comment --comment "portal for default/kubernetes:" -m state --state NEW -m tcp --dport 443 -j KUBE-SVC-5555555555555555
  98. -A KUBE-PORTALS-CONTAINER -d 10.247.0.10/32 -p udp -m comment --comment "portal for kube-system/kube-dns:dns" -m state --state NEW -m udp --dport 53 -j KUBE-SVC-6666666666666666
  99. -A KUBE-PORTALS-CONTAINER -d 10.247.0.10/32 -p tcp -m comment --comment "portal for kube-system/kube-dns:dns-tcp" -m state --state NEW -m tcp --dport 53 -j KUBE-SVC-2222222222222222
  100. -A KUBE-PORTALS-HOST -d 10.247.0.1/32 -p tcp -m comment --comment "portal for default/kubernetes:" -m state --state NEW -m tcp --dport 443 -j KUBE-SVC-5555555555555555
  101. -A KUBE-PORTALS-HOST -d 10.247.0.10/32 -p udp -m comment --comment "portal for kube-system/kube-dns:dns" -m state --state NEW -m udp --dport 53 -j KUBE-SVC-6666666666666666
  102. -A KUBE-PORTALS-HOST -d 10.247.0.10/32 -p tcp -m comment --comment "portal for kube-system/kube-dns:dns-tcp" -m state --state NEW -m tcp --dport 53 -j KUBE-SVC-2222222222222222
  103. -A KUBE-SVC-1111111111111111 -p udp -m comment --comment "kube-system/kube-dns:dns" -m recent --set --name KUBE-SVC-1111111111111111 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.246.1.2:53
  104. -A KUBE-SVC-2222222222222222 -m comment --comment "kube-system/kube-dns:dns-tcp" -j KUBE-SVC-3333333333333333
  105. -A KUBE-SVC-3333333333333333 -p tcp -m comment --comment "kube-system/kube-dns:dns-tcp" -m recent --set --name KUBE-SVC-3333333333333333 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.246.1.2:53
  106. -A KUBE-SVC-4444444444444444 -p tcp -m comment --comment "default/kubernetes:" -m recent --set --name KUBE-SVC-4444444444444444 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.245.1.2:443
  107. -A KUBE-SVC-5555555555555555 -m comment --comment "default/kubernetes:" -j KUBE-SVC-4444444444444444
  108. -A KUBE-SVC-6666666666666666 -m comment --comment "kube-system/kube-dns:dns" -j KUBE-SVC-1111111111111111
  109. COMMIT
  110. # Completed on Fri Aug 7 14:47:37 2015
  111. # Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
  112. *filter
  113. :INPUT ACCEPT [17514:83115836]
  114. :FORWARD ACCEPT [0:0]
  115. :OUTPUT ACCEPT [8909:688225]
  116. :DOCKER - [0:0]
  117. -A FORWARD -o cbr0 -j DOCKER
  118. -A FORWARD -o cbr0 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
  119. -A FORWARD -i cbr0 ! -o cbr0 -j ACCEPT
  120. -A FORWARD -i cbr0 -o cbr0 -j ACCEPT
  121. COMMIT
  122. `
  123. expected := map[utiliptables.Chain]string{
  124. utiliptables.ChainPrerouting: ":PREROUTING ACCEPT [2:138]",
  125. utiliptables.Chain("INPUT"): ":INPUT ACCEPT [0:0]",
  126. utiliptables.Chain("OUTPUT"): ":OUTPUT ACCEPT [0:0]",
  127. utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [0:0]",
  128. utiliptables.Chain("DOCKER"): ":DOCKER - [0:0]",
  129. utiliptables.Chain("KUBE-NODEPORT-CONTAINER"): ":KUBE-NODEPORT-CONTAINER - [0:0]",
  130. utiliptables.Chain("KUBE-NODEPORT-HOST"): ":KUBE-NODEPORT-HOST - [0:0]",
  131. utiliptables.Chain("KUBE-PORTALS-CONTAINER"): ":KUBE-PORTALS-CONTAINER - [0:0]",
  132. utiliptables.Chain("KUBE-PORTALS-HOST"): ":KUBE-PORTALS-HOST - [0:0]",
  133. utiliptables.Chain("KUBE-SVC-1111111111111111"): ":KUBE-SVC-1111111111111111 - [0:0]",
  134. utiliptables.Chain("KUBE-SVC-2222222222222222"): ":KUBE-SVC-2222222222222222 - [0:0]",
  135. utiliptables.Chain("KUBE-SVC-3333333333333333"): ":KUBE-SVC-3333333333333333 - [0:0]",
  136. utiliptables.Chain("KUBE-SVC-4444444444444444"): ":KUBE-SVC-4444444444444444 - [0:0]",
  137. utiliptables.Chain("KUBE-SVC-5555555555555555"): ":KUBE-SVC-5555555555555555 - [0:0]",
  138. utiliptables.Chain("KUBE-SVC-6666666666666666"): ":KUBE-SVC-6666666666666666 - [0:0]",
  139. }
  140. checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
  141. }
  142. func TestDeleteEndpointConnections(t *testing.T) {
  143. const (
  144. UDP = v1.ProtocolUDP
  145. TCP = v1.ProtocolTCP
  146. SCTP = v1.ProtocolSCTP
  147. )
  148. testCases := []struct {
  149. description string
  150. svcName string
  151. svcIP string
  152. svcPort int32
  153. protocol v1.Protocol
  154. endpoint string // IP:port endpoint
  155. epSvcPair proxy.ServiceEndpoint // Will be generated by test
  156. simulatedErr string
  157. }{
  158. {
  159. description: "V4 UDP",
  160. svcName: "v4-udp",
  161. svcIP: "10.96.1.1",
  162. svcPort: 80,
  163. protocol: UDP,
  164. endpoint: "10.240.0.3:80",
  165. }, {
  166. description: "V4 TCP",
  167. svcName: "v4-tcp",
  168. svcIP: "10.96.2.2",
  169. svcPort: 80,
  170. protocol: TCP,
  171. endpoint: "10.240.0.4:80",
  172. }, {
  173. description: "V4 SCTP",
  174. svcName: "v4-sctp",
  175. svcIP: "10.96.3.3",
  176. svcPort: 80,
  177. protocol: SCTP,
  178. endpoint: "10.240.0.5:80",
  179. }, {
  180. description: "V4 UDP, nothing to delete, benign error",
  181. svcName: "v4-udp-nothing-to-delete",
  182. svcIP: "10.96.1.1",
  183. svcPort: 80,
  184. protocol: UDP,
  185. endpoint: "10.240.0.3:80",
  186. simulatedErr: conntrack.NoConnectionToDelete,
  187. }, {
  188. description: "V4 UDP, unexpected error, should be glogged",
  189. svcName: "v4-udp-simulated-error",
  190. svcIP: "10.96.1.1",
  191. svcPort: 80,
  192. protocol: UDP,
  193. endpoint: "10.240.0.3:80",
  194. simulatedErr: "simulated error",
  195. }, {
  196. description: "V6 UDP",
  197. svcName: "v6-udp",
  198. svcIP: "fd00:1234::20",
  199. svcPort: 80,
  200. protocol: UDP,
  201. endpoint: "[2001:db8::2]:80",
  202. }, {
  203. description: "V6 TCP",
  204. svcName: "v6-tcp",
  205. svcIP: "fd00:1234::30",
  206. svcPort: 80,
  207. protocol: TCP,
  208. endpoint: "[2001:db8::3]:80",
  209. }, {
  210. description: "V6 SCTP",
  211. svcName: "v6-sctp",
  212. svcIP: "fd00:1234::40",
  213. svcPort: 80,
  214. protocol: SCTP,
  215. endpoint: "[2001:db8::4]:80",
  216. },
  217. }
  218. // Create a fake executor for the conntrack utility. This should only be
  219. // invoked for UDP connections, since no conntrack cleanup is needed for TCP
  220. fcmd := fakeexec.FakeCmd{}
  221. fexec := fakeexec.FakeExec{
  222. LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
  223. }
  224. execFunc := func(cmd string, args ...string) exec.Cmd {
  225. return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
  226. }
  227. for _, tc := range testCases {
  228. if tc.protocol == UDP {
  229. var cmdOutput string
  230. var simErr error
  231. if tc.simulatedErr == "" {
  232. cmdOutput = "1 flow entries have been deleted"
  233. } else {
  234. simErr = fmt.Errorf(tc.simulatedErr)
  235. }
  236. cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr }
  237. fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
  238. fexec.CommandScript = append(fexec.CommandScript, execFunc)
  239. }
  240. }
  241. ipt := iptablestest.NewFake()
  242. fp := NewFakeProxier(ipt, false)
  243. fp.exec = &fexec
  244. for _, tc := range testCases {
  245. makeServiceMap(fp,
  246. makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
  247. svc.Spec.ClusterIP = tc.svcIP
  248. svc.Spec.Ports = []v1.ServicePort{{
  249. Name: "p80",
  250. Port: tc.svcPort,
  251. Protocol: tc.protocol,
  252. }}
  253. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  254. }),
  255. )
  256. proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  257. }
  258. // Run the test cases
  259. for _, tc := range testCases {
  260. priorExecs := fexec.CommandCalls
  261. priorGlogErrs := klog.Stats.Error.Lines()
  262. svc := proxy.ServicePortName{
  263. NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
  264. Port: "p80",
  265. Protocol: tc.protocol,
  266. }
  267. input := []proxy.ServiceEndpoint{
  268. {
  269. Endpoint: tc.endpoint,
  270. ServicePortName: svc,
  271. },
  272. }
  273. fp.deleteEndpointConnections(input)
  274. // For UDP connections, check the executed conntrack command
  275. var expExecs int
  276. if tc.protocol == UDP {
  277. isIPv6 := func(ip string) bool {
  278. netIP := net.ParseIP(ip)
  279. return netIP.To4() == nil
  280. }
  281. endpointIP := utilproxy.IPPart(tc.endpoint)
  282. expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
  283. if isIPv6(endpointIP) {
  284. expectCommand += " -f ipv6"
  285. }
  286. actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ")
  287. if actualCommand != expectCommand {
  288. t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand)
  289. }
  290. expExecs = 1
  291. }
  292. // Check the number of times conntrack was executed
  293. execs := fexec.CommandCalls - priorExecs
  294. if execs != expExecs {
  295. t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs)
  296. }
  297. // Check the number of new glog errors
  298. var expGlogErrs int64
  299. if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
  300. expGlogErrs = 1
  301. }
  302. glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
  303. if glogErrs != expGlogErrs {
  304. t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs)
  305. }
  306. }
  307. }
  308. // fakePortOpener implements portOpener.
  309. type fakePortOpener struct {
  310. openPorts []*utilproxy.LocalPort
  311. }
  312. // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
  313. // to lock a local port.
  314. func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
  315. f.openPorts = append(f.openPorts, lp)
  316. return nil, nil
  317. }
  318. const testHostname = "test-hostname"
  319. func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Proxier {
  320. // TODO: Call NewProxier after refactoring out the goroutine
  321. // invocation into a Run() method.
  322. p := &Proxier{
  323. exec: &fakeexec.FakeExec{},
  324. serviceMap: make(proxy.ServiceMap),
  325. serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
  326. endpointsMap: make(proxy.EndpointsMap),
  327. endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled),
  328. iptables: ipt,
  329. clusterCIDR: "10.0.0.0/24",
  330. hostname: testHostname,
  331. portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
  332. portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
  333. serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
  334. precomputedProbabilities: make([]string, 0, 1001),
  335. iptablesData: bytes.NewBuffer(nil),
  336. existingFilterChainsData: bytes.NewBuffer(nil),
  337. filterChains: bytes.NewBuffer(nil),
  338. filterRules: bytes.NewBuffer(nil),
  339. natChains: bytes.NewBuffer(nil),
  340. natRules: bytes.NewBuffer(nil),
  341. nodePortAddresses: make([]string, 0),
  342. networkInterfacer: utilproxytest.NewFakeNetwork(),
  343. }
  344. p.setInitialized(true)
  345. p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
  346. return p
  347. }
  348. func hasSessionAffinityRule(rules []iptablestest.Rule) bool {
  349. for _, r := range rules {
  350. if _, ok := r[iptablestest.Recent]; ok {
  351. return true
  352. }
  353. }
  354. return false
  355. }
  356. func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool {
  357. destPortStr := strconv.Itoa(destPort)
  358. match := false
  359. for _, r := range rules {
  360. if r[iptablestest.Jump] == destChain {
  361. match = true
  362. if destIP != "" {
  363. if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPortStr) || r[iptablestest.DPort] == "") {
  364. return true
  365. }
  366. match = false
  367. }
  368. if destPort != 0 {
  369. if strings.Contains(r[iptablestest.DPort], destPortStr) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") {
  370. return true
  371. }
  372. match = false
  373. }
  374. }
  375. }
  376. return match
  377. }
  378. func hasSrcType(rules []iptablestest.Rule, srcType string) bool {
  379. for _, r := range rules {
  380. if r[iptablestest.SrcType] != srcType {
  381. continue
  382. }
  383. return true
  384. }
  385. return false
  386. }
  387. func hasMasqRandomFully(rules []iptablestest.Rule) bool {
  388. for _, r := range rules {
  389. if r[iptablestest.Masquerade] == "--random-fully" {
  390. return true
  391. }
  392. }
  393. return false
  394. }
  395. func TestHasJump(t *testing.T) {
  396. testCases := map[string]struct {
  397. rules []iptablestest.Rule
  398. destChain string
  399. destIP string
  400. destPort int
  401. expected bool
  402. }{
  403. "case 1": {
  404. // Match the 1st rule(both dest IP and dest Port)
  405. rules: []iptablestest.Rule{
  406. {"-d ": "10.20.30.41/32", "--dport ": "80", "-p ": "tcp", "-j ": "REJECT"},
  407. {"--dport ": "3001", "-p ": "tcp", "-j ": "KUBE-MARK-MASQ"},
  408. },
  409. destChain: "REJECT",
  410. destIP: "10.20.30.41",
  411. destPort: 80,
  412. expected: true,
  413. },
  414. "case 2": {
  415. // Match the 2nd rule(dest Port)
  416. rules: []iptablestest.Rule{
  417. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  418. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  419. },
  420. destChain: "REJECT",
  421. destIP: "",
  422. destPort: 3001,
  423. expected: true,
  424. },
  425. "case 3": {
  426. // Match both dest IP and dest Port
  427. rules: []iptablestest.Rule{
  428. {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
  429. },
  430. destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
  431. destIP: "1.2.3.4",
  432. destPort: 80,
  433. expected: true,
  434. },
  435. "case 4": {
  436. // Match dest IP but doesn't match dest Port
  437. rules: []iptablestest.Rule{
  438. {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
  439. },
  440. destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
  441. destIP: "1.2.3.4",
  442. destPort: 8080,
  443. expected: false,
  444. },
  445. "case 5": {
  446. // Match dest Port but doesn't match dest IP
  447. rules: []iptablestest.Rule{
  448. {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
  449. },
  450. destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
  451. destIP: "10.20.30.40",
  452. destPort: 80,
  453. expected: false,
  454. },
  455. "case 6": {
  456. // Match the 2nd rule(dest IP)
  457. rules: []iptablestest.Rule{
  458. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  459. {"-d ": "1.2.3.4/32", "-p ": "tcp", "-j ": "REJECT"},
  460. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  461. },
  462. destChain: "REJECT",
  463. destIP: "1.2.3.4",
  464. destPort: 8080,
  465. expected: true,
  466. },
  467. "case 7": {
  468. // Match the 2nd rule(dest Port)
  469. rules: []iptablestest.Rule{
  470. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  471. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  472. },
  473. destChain: "REJECT",
  474. destIP: "1.2.3.4",
  475. destPort: 3001,
  476. expected: true,
  477. },
  478. "case 8": {
  479. // Match the 1st rule(dest IP)
  480. rules: []iptablestest.Rule{
  481. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  482. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  483. },
  484. destChain: "REJECT",
  485. destIP: "10.20.30.41",
  486. destPort: 8080,
  487. expected: true,
  488. },
  489. "case 9": {
  490. rules: []iptablestest.Rule{
  491. {"-j ": "KUBE-SEP-LWSOSDSHMKPJHHJV"},
  492. },
  493. destChain: "KUBE-SEP-LWSOSDSHMKPJHHJV",
  494. destIP: "",
  495. destPort: 0,
  496. expected: true,
  497. },
  498. "case 10": {
  499. rules: []iptablestest.Rule{
  500. {"-j ": "KUBE-SEP-FOO"},
  501. },
  502. destChain: "KUBE-SEP-BAR",
  503. destIP: "",
  504. destPort: 0,
  505. expected: false,
  506. },
  507. }
  508. for k, tc := range testCases {
  509. if got := hasJump(tc.rules, tc.destChain, tc.destIP, tc.destPort); got != tc.expected {
  510. t.Errorf("%v: expected %v, got %v", k, tc.expected, got)
  511. }
  512. }
  513. }
  514. func hasDNAT(rules []iptablestest.Rule, endpoint string) bool {
  515. for _, r := range rules {
  516. if r[iptablestest.ToDest] == endpoint {
  517. return true
  518. }
  519. }
  520. return false
  521. }
  522. func errorf(msg string, rules []iptablestest.Rule, t *testing.T) {
  523. for _, r := range rules {
  524. t.Logf("%q", r)
  525. }
  526. t.Errorf("%v", msg)
  527. }
  528. func TestClusterIPReject(t *testing.T) {
  529. ipt := iptablestest.NewFake()
  530. fp := NewFakeProxier(ipt, false)
  531. svcIP := "10.20.30.41"
  532. svcPort := 80
  533. svcPortName := proxy.ServicePortName{
  534. NamespacedName: makeNSN("ns1", "svc1"),
  535. Port: "p80",
  536. }
  537. makeServiceMap(fp,
  538. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  539. svc.Spec.ClusterIP = svcIP
  540. svc.Spec.Ports = []v1.ServicePort{{
  541. Name: svcPortName.Port,
  542. Port: int32(svcPort),
  543. Protocol: v1.ProtocolTCP,
  544. }}
  545. }),
  546. )
  547. makeEndpointsMap(fp)
  548. fp.syncProxyRules()
  549. svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP))))
  550. svcRules := ipt.GetRules(svcChain)
  551. if len(svcRules) != 0 {
  552. errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcPortName), svcRules, t)
  553. }
  554. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  555. if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcPort) {
  556. errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
  557. }
  558. }
  559. func TestClusterIPEndpointsJump(t *testing.T) {
  560. ipt := iptablestest.NewFake()
  561. fp := NewFakeProxier(ipt, false)
  562. svcIP := "10.20.30.41"
  563. svcPort := 80
  564. svcPortName := proxy.ServicePortName{
  565. NamespacedName: makeNSN("ns1", "svc1"),
  566. Port: "p80",
  567. Protocol: v1.ProtocolTCP,
  568. }
  569. makeServiceMap(fp,
  570. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  571. svc.Spec.ClusterIP = svcIP
  572. svc.Spec.Ports = []v1.ServicePort{{
  573. Name: svcPortName.Port,
  574. Port: int32(svcPort),
  575. Protocol: v1.ProtocolTCP,
  576. }}
  577. }),
  578. )
  579. epIP := "10.180.0.1"
  580. makeEndpointsMap(fp,
  581. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  582. ept.Subsets = []v1.EndpointSubset{{
  583. Addresses: []v1.EndpointAddress{{
  584. IP: epIP,
  585. }},
  586. Ports: []v1.EndpointPort{{
  587. Name: svcPortName.Port,
  588. Port: int32(svcPort),
  589. Protocol: v1.ProtocolTCP,
  590. }},
  591. }}
  592. }),
  593. )
  594. fp.syncProxyRules()
  595. epStr := fmt.Sprintf("%s:%d", epIP, svcPort)
  596. svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP))))
  597. epChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStr))
  598. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  599. if !hasJump(kubeSvcRules, svcChain, svcIP, svcPort) {
  600. errorf(fmt.Sprintf("Failed to find jump from KUBE-SERVICES to %v chain", svcChain), kubeSvcRules, t)
  601. }
  602. svcRules := ipt.GetRules(svcChain)
  603. if !hasJump(svcRules, epChain, "", 0) {
  604. errorf(fmt.Sprintf("Failed to jump to ep chain %v", epChain), svcRules, t)
  605. }
  606. epRules := ipt.GetRules(epChain)
  607. if !hasDNAT(epRules, epStr) {
  608. errorf(fmt.Sprintf("Endpoint chain %v lacks DNAT to %v", epChain, epStr), epRules, t)
  609. }
  610. }
  611. func TestLoadBalancer(t *testing.T) {
  612. ipt := iptablestest.NewFake()
  613. fp := NewFakeProxier(ipt, false)
  614. svcIP := "10.20.30.41"
  615. svcPort := 80
  616. svcNodePort := 3001
  617. svcLBIP := "1.2.3.4"
  618. svcPortName := proxy.ServicePortName{
  619. NamespacedName: makeNSN("ns1", "svc1"),
  620. Port: "p80",
  621. Protocol: v1.ProtocolTCP,
  622. }
  623. makeServiceMap(fp,
  624. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  625. svc.Spec.Type = "LoadBalancer"
  626. svc.Spec.ClusterIP = svcIP
  627. svc.Spec.Ports = []v1.ServicePort{{
  628. Name: svcPortName.Port,
  629. Port: int32(svcPort),
  630. Protocol: v1.ProtocolTCP,
  631. NodePort: int32(svcNodePort),
  632. }}
  633. svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
  634. IP: svcLBIP,
  635. }}
  636. }),
  637. )
  638. epIP := "10.180.0.1"
  639. makeEndpointsMap(fp,
  640. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  641. ept.Subsets = []v1.EndpointSubset{{
  642. Addresses: []v1.EndpointAddress{{
  643. IP: epIP,
  644. }},
  645. Ports: []v1.EndpointPort{{
  646. Name: svcPortName.Port,
  647. Port: int32(svcPort),
  648. Protocol: v1.ProtocolTCP,
  649. }},
  650. }}
  651. }),
  652. )
  653. fp.syncProxyRules()
  654. proto := strings.ToLower(string(v1.ProtocolTCP))
  655. fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
  656. svcChain := string(servicePortChainName(svcPortName.String(), proto))
  657. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  658. if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
  659. errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
  660. }
  661. fwRules := ipt.GetRules(fwChain)
  662. if !hasJump(fwRules, svcChain, "", 0) || !hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
  663. errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, svcChain), fwRules, t)
  664. }
  665. }
  666. func TestNodePort(t *testing.T) {
  667. ipt := iptablestest.NewFake()
  668. fp := NewFakeProxier(ipt, false)
  669. svcIP := "10.20.30.41"
  670. svcPort := 80
  671. svcNodePort := 3001
  672. svcPortName := proxy.ServicePortName{
  673. NamespacedName: makeNSN("ns1", "svc1"),
  674. Port: "p80",
  675. Protocol: v1.ProtocolTCP,
  676. }
  677. makeServiceMap(fp,
  678. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  679. svc.Spec.Type = "NodePort"
  680. svc.Spec.ClusterIP = svcIP
  681. svc.Spec.Ports = []v1.ServicePort{{
  682. Name: svcPortName.Port,
  683. Port: int32(svcPort),
  684. Protocol: v1.ProtocolTCP,
  685. NodePort: int32(svcNodePort),
  686. }}
  687. }),
  688. )
  689. epIP := "10.180.0.1"
  690. makeEndpointsMap(fp,
  691. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  692. ept.Subsets = []v1.EndpointSubset{{
  693. Addresses: []v1.EndpointAddress{{
  694. IP: epIP,
  695. }},
  696. Ports: []v1.EndpointPort{{
  697. Name: svcPortName.Port,
  698. Port: int32(svcPort),
  699. Protocol: v1.ProtocolTCP,
  700. }},
  701. }}
  702. }),
  703. )
  704. itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
  705. addrs := []net.Addr{utilproxytest.AddrStruct{Val: "127.0.0.1/16"}}
  706. itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
  707. addrs1 := []net.Addr{utilproxytest.AddrStruct{Val: "::1/128"}}
  708. fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
  709. fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
  710. fp.nodePortAddresses = []string{}
  711. fp.syncProxyRules()
  712. proto := strings.ToLower(string(v1.ProtocolTCP))
  713. svcChain := string(servicePortChainName(svcPortName.String(), proto))
  714. kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
  715. if !hasJump(kubeNodePortRules, svcChain, "", svcNodePort) {
  716. errorf(fmt.Sprintf("Failed to find jump to svc chain %v", svcChain), kubeNodePortRules, t)
  717. }
  718. }
  719. func TestMasqueradeRule(t *testing.T) {
  720. for _, testcase := range []bool{false, true} {
  721. ipt := iptablestest.NewFake().SetHasRandomFully(testcase)
  722. fp := NewFakeProxier(ipt, false)
  723. makeServiceMap(fp)
  724. makeEndpointsMap(fp)
  725. fp.syncProxyRules()
  726. postRoutingRules := ipt.GetRules(string(kubePostroutingChain))
  727. if !hasJump(postRoutingRules, "MASQUERADE", "", 0) {
  728. errorf(fmt.Sprintf("Failed to find -j MASQUERADE in %s chain", kubePostroutingChain), postRoutingRules, t)
  729. }
  730. if hasMasqRandomFully(postRoutingRules) != testcase {
  731. probs := map[bool]string{false: "found", true: "did not find"}
  732. errorf(fmt.Sprintf("%s --random-fully in -j MASQUERADE rule in %s chain when HasRandomFully()==%v", probs[testcase], kubePostroutingChain, testcase), postRoutingRules, t)
  733. }
  734. }
  735. }
  736. func TestExternalIPsReject(t *testing.T) {
  737. ipt := iptablestest.NewFake()
  738. fp := NewFakeProxier(ipt, false)
  739. svcIP := "10.20.30.41"
  740. svcPort := 80
  741. svcExternalIPs := "50.60.70.81"
  742. svcPortName := proxy.ServicePortName{
  743. NamespacedName: makeNSN("ns1", "svc1"),
  744. Port: "p80",
  745. }
  746. makeServiceMap(fp,
  747. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  748. svc.Spec.Type = "ClusterIP"
  749. svc.Spec.ClusterIP = svcIP
  750. svc.Spec.ExternalIPs = []string{svcExternalIPs}
  751. svc.Spec.Ports = []v1.ServicePort{{
  752. Name: svcPortName.Port,
  753. Port: int32(svcPort),
  754. Protocol: v1.ProtocolTCP,
  755. TargetPort: intstr.FromInt(svcPort),
  756. }}
  757. }),
  758. )
  759. makeEndpointsMap(fp)
  760. fp.syncProxyRules()
  761. kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain))
  762. if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) {
  763. errorf(fmt.Sprintf("Failed to find a %v rule for externalIP %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
  764. }
  765. }
  766. func TestNodePortReject(t *testing.T) {
  767. ipt := iptablestest.NewFake()
  768. fp := NewFakeProxier(ipt, false)
  769. svcIP := "10.20.30.41"
  770. svcPort := 80
  771. svcNodePort := 3001
  772. svcPortName := proxy.ServicePortName{
  773. NamespacedName: makeNSN("ns1", "svc1"),
  774. Port: "p80",
  775. }
  776. makeServiceMap(fp,
  777. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  778. svc.Spec.Type = "NodePort"
  779. svc.Spec.ClusterIP = svcIP
  780. svc.Spec.Ports = []v1.ServicePort{{
  781. Name: svcPortName.Port,
  782. Port: int32(svcPort),
  783. Protocol: v1.ProtocolTCP,
  784. NodePort: int32(svcNodePort),
  785. }}
  786. }),
  787. )
  788. makeEndpointsMap(fp)
  789. fp.syncProxyRules()
  790. kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain))
  791. if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) {
  792. errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
  793. }
  794. }
  795. func TestOnlyLocalLoadBalancing(t *testing.T) {
  796. ipt := iptablestest.NewFake()
  797. fp := NewFakeProxier(ipt, false)
  798. svcIP := "10.20.30.41"
  799. svcPort := 80
  800. svcNodePort := 3001
  801. svcLBIP := "1.2.3.4"
  802. svcPortName := proxy.ServicePortName{
  803. NamespacedName: makeNSN("ns1", "svc1"),
  804. Port: "p80",
  805. Protocol: v1.ProtocolTCP,
  806. }
  807. svcSessionAffinityTimeout := int32(10800)
  808. makeServiceMap(fp,
  809. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  810. svc.Spec.Type = "LoadBalancer"
  811. svc.Spec.ClusterIP = svcIP
  812. svc.Spec.Ports = []v1.ServicePort{{
  813. Name: svcPortName.Port,
  814. Port: int32(svcPort),
  815. Protocol: v1.ProtocolTCP,
  816. NodePort: int32(svcNodePort),
  817. }}
  818. svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
  819. IP: svcLBIP,
  820. }}
  821. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  822. svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
  823. svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
  824. ClientIP: &v1.ClientIPConfig{TimeoutSeconds: &svcSessionAffinityTimeout},
  825. }
  826. }),
  827. )
  828. epIP1 := "10.180.0.1"
  829. epIP2 := "10.180.2.1"
  830. epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
  831. epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
  832. makeEndpointsMap(fp,
  833. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  834. ept.Subsets = []v1.EndpointSubset{{
  835. Addresses: []v1.EndpointAddress{{
  836. IP: epIP1,
  837. NodeName: nil,
  838. }, {
  839. IP: epIP2,
  840. NodeName: utilpointer.StringPtr(testHostname),
  841. }},
  842. Ports: []v1.EndpointPort{{
  843. Name: svcPortName.Port,
  844. Port: int32(svcPort),
  845. Protocol: v1.ProtocolTCP,
  846. }},
  847. }}
  848. }),
  849. )
  850. fp.syncProxyRules()
  851. proto := strings.ToLower(string(v1.ProtocolTCP))
  852. fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
  853. lbChain := string(serviceLBChainName(svcPortName.String(), proto))
  854. nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrLocal))
  855. localEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrNonLocal))
  856. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  857. if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
  858. errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
  859. }
  860. fwRules := ipt.GetRules(fwChain)
  861. if !hasJump(fwRules, lbChain, "", 0) {
  862. errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, lbChain), fwRules, t)
  863. }
  864. if hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
  865. errorf(fmt.Sprintf("Found jump from fw chain %v to MASQUERADE", fwChain), fwRules, t)
  866. }
  867. lbRules := ipt.GetRules(lbChain)
  868. if hasJump(lbRules, nonLocalEpChain, "", 0) {
  869. errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
  870. }
  871. if !hasJump(lbRules, localEpChain, "", 0) {
  872. errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrNonLocal), lbRules, t)
  873. }
  874. if !hasSessionAffinityRule(lbRules) {
  875. errorf(fmt.Sprintf("Didn't find session affinity rule from lb chain %v", lbChain), lbRules, t)
  876. }
  877. }
  878. func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) {
  879. ipt := iptablestest.NewFake()
  880. fp := NewFakeProxier(ipt, false)
  881. // set cluster CIDR to empty before test
  882. fp.clusterCIDR = ""
  883. onlyLocalNodePorts(t, fp, ipt)
  884. }
  885. func TestOnlyLocalNodePorts(t *testing.T) {
  886. ipt := iptablestest.NewFake()
  887. fp := NewFakeProxier(ipt, false)
  888. onlyLocalNodePorts(t, fp, ipt)
  889. }
  890. func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTables) {
  891. svcIP := "10.20.30.41"
  892. svcPort := 80
  893. svcNodePort := 3001
  894. svcPortName := proxy.ServicePortName{
  895. NamespacedName: makeNSN("ns1", "svc1"),
  896. Port: "p80",
  897. Protocol: v1.ProtocolTCP,
  898. }
  899. makeServiceMap(fp,
  900. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  901. svc.Spec.Type = "NodePort"
  902. svc.Spec.ClusterIP = svcIP
  903. svc.Spec.Ports = []v1.ServicePort{{
  904. Name: svcPortName.Port,
  905. Port: int32(svcPort),
  906. Protocol: v1.ProtocolTCP,
  907. NodePort: int32(svcNodePort),
  908. }}
  909. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  910. }),
  911. )
  912. epIP1 := "10.180.0.1"
  913. epIP2 := "10.180.2.1"
  914. epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
  915. epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
  916. makeEndpointsMap(fp,
  917. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  918. ept.Subsets = []v1.EndpointSubset{{
  919. Addresses: []v1.EndpointAddress{{
  920. IP: epIP1,
  921. NodeName: nil,
  922. }, {
  923. IP: epIP2,
  924. NodeName: utilpointer.StringPtr(testHostname),
  925. }},
  926. Ports: []v1.EndpointPort{{
  927. Name: svcPortName.Port,
  928. Port: int32(svcPort),
  929. Protocol: v1.ProtocolTCP,
  930. }},
  931. }}
  932. }),
  933. )
  934. itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0}
  935. addrs := []net.Addr{utilproxytest.AddrStruct{Val: "10.20.30.51/24"}}
  936. fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
  937. fp.nodePortAddresses = []string{"10.20.30.0/24"}
  938. fp.syncProxyRules()
  939. proto := strings.ToLower(string(v1.ProtocolTCP))
  940. lbChain := string(serviceLBChainName(svcPortName.String(), proto))
  941. nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), proto, epStrLocal))
  942. localEpChain := string(servicePortEndpointChainName(svcPortName.String(), proto, epStrNonLocal))
  943. kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
  944. if !hasJump(kubeNodePortRules, lbChain, "", svcNodePort) {
  945. errorf(fmt.Sprintf("Failed to find jump to lb chain %v", lbChain), kubeNodePortRules, t)
  946. }
  947. if !hasJump(kubeNodePortRules, string(KubeMarkMasqChain), "", svcNodePort) {
  948. errorf(fmt.Sprintf("Failed to find jump to %s chain for destination IP %d", KubeMarkMasqChain, svcNodePort), kubeNodePortRules, t)
  949. }
  950. kubeServiceRules := ipt.GetRules(string(kubeServicesChain))
  951. if !hasJump(kubeServiceRules, string(kubeNodePortsChain), "10.20.30.51", 0) {
  952. errorf(fmt.Sprintf("Failed to find jump to KUBE-NODEPORTS chain %v", string(kubeNodePortsChain)), kubeServiceRules, t)
  953. }
  954. svcChain := string(servicePortChainName(svcPortName.String(), proto))
  955. lbRules := ipt.GetRules(lbChain)
  956. if hasJump(lbRules, nonLocalEpChain, "", 0) {
  957. errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
  958. }
  959. if !hasJump(lbRules, svcChain, "", 0) || !hasSrcType(lbRules, "LOCAL") {
  960. errorf(fmt.Sprintf("Did not find jump from lb chain %v to svc %v with src-type LOCAL", lbChain, svcChain), lbRules, t)
  961. }
  962. if !hasJump(lbRules, localEpChain, "", 0) {
  963. errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrLocal), lbRules, t)
  964. }
  965. }
  966. func TestComputeProbability(t *testing.T) {
  967. expectedProbabilities := map[int]string{
  968. 1: "1.0000000000",
  969. 2: "0.5000000000",
  970. 10: "0.1000000000",
  971. 100: "0.0100000000",
  972. 1000: "0.0010000000",
  973. 10000: "0.0001000000",
  974. 100000: "0.0000100000",
  975. 100001: "0.0000099999",
  976. }
  977. for num, expected := range expectedProbabilities {
  978. actual := computeProbability(num)
  979. if actual != expected {
  980. t.Errorf("Expected computeProbability(%d) to be %s, got: %s", num, expected, actual)
  981. }
  982. }
  983. prevProbability := float64(0)
  984. for i := 100000; i > 1; i-- {
  985. currProbability, err := strconv.ParseFloat(computeProbability(i), 64)
  986. if err != nil {
  987. t.Fatalf("Error parsing float probability for %d: %v", i, err)
  988. }
  989. if currProbability <= prevProbability {
  990. t.Fatalf("Probability unexpectedly <= to previous probability for %d: (%0.10f <= %0.10f)", i, currProbability, prevProbability)
  991. }
  992. prevProbability = currProbability
  993. }
  994. }
  995. func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
  996. svc := &v1.Service{
  997. ObjectMeta: metav1.ObjectMeta{
  998. Name: name,
  999. Namespace: namespace,
  1000. Annotations: map[string]string{},
  1001. },
  1002. Spec: v1.ServiceSpec{},
  1003. Status: v1.ServiceStatus{},
  1004. }
  1005. svcFunc(svc)
  1006. return svc
  1007. }
  1008. func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port, nodeport int32, targetPort int) []v1.ServicePort {
  1009. svcPort := v1.ServicePort{
  1010. Name: name,
  1011. Protocol: protocol,
  1012. Port: port,
  1013. NodePort: nodeport,
  1014. TargetPort: intstr.FromInt(targetPort),
  1015. }
  1016. return append(array, svcPort)
  1017. }
  1018. func TestBuildServiceMapAddRemove(t *testing.T) {
  1019. ipt := iptablestest.NewFake()
  1020. fp := NewFakeProxier(ipt, false)
  1021. services := []*v1.Service{
  1022. makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
  1023. svc.Spec.Type = v1.ServiceTypeClusterIP
  1024. svc.Spec.ClusterIP = "172.16.55.4"
  1025. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
  1026. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
  1027. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpport", "SCTP", 1236, 6321, 0)
  1028. }),
  1029. makeTestService("somewhere-else", "node-port", func(svc *v1.Service) {
  1030. svc.Spec.Type = v1.ServiceTypeNodePort
  1031. svc.Spec.ClusterIP = "172.16.55.10"
  1032. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0)
  1033. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0)
  1034. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "muchmoreblah", "SCTP", 343, 676, 0)
  1035. }),
  1036. makeTestService("somewhere", "load-balancer", func(svc *v1.Service) {
  1037. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1038. svc.Spec.ClusterIP = "172.16.55.11"
  1039. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1040. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
  1041. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
  1042. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1043. Ingress: []v1.LoadBalancerIngress{
  1044. {IP: "10.1.2.4"},
  1045. },
  1046. }
  1047. }),
  1048. makeTestService("somewhere", "only-local-load-balancer", func(svc *v1.Service) {
  1049. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1050. svc.Spec.ClusterIP = "172.16.55.12"
  1051. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1052. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
  1053. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
  1054. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1055. Ingress: []v1.LoadBalancerIngress{
  1056. {IP: "10.1.2.3"},
  1057. },
  1058. }
  1059. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  1060. svc.Spec.HealthCheckNodePort = 345
  1061. }),
  1062. }
  1063. for i := range services {
  1064. fp.OnServiceAdd(services[i])
  1065. }
  1066. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1067. if len(fp.serviceMap) != 10 {
  1068. t.Errorf("expected service map length 10, got %v", fp.serviceMap)
  1069. }
  1070. // The only-local-loadbalancer ones get added
  1071. if len(result.HCServiceNodePorts) != 1 {
  1072. t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
  1073. } else {
  1074. nsn := makeNSN("somewhere", "only-local-load-balancer")
  1075. if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
  1076. t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
  1077. }
  1078. }
  1079. if len(result.UDPStaleClusterIP) != 0 {
  1080. // Services only added, so nothing stale yet
  1081. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1082. }
  1083. // Remove some stuff
  1084. // oneService is a modification of services[0] with removed first port.
  1085. oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
  1086. svc.Spec.Type = v1.ServiceTypeClusterIP
  1087. svc.Spec.ClusterIP = "172.16.55.4"
  1088. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
  1089. })
  1090. fp.OnServiceUpdate(services[0], oneService)
  1091. fp.OnServiceDelete(services[1])
  1092. fp.OnServiceDelete(services[2])
  1093. fp.OnServiceDelete(services[3])
  1094. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1095. if len(fp.serviceMap) != 1 {
  1096. t.Errorf("expected service map length 1, got %v", fp.serviceMap)
  1097. }
  1098. if len(result.HCServiceNodePorts) != 0 {
  1099. t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
  1100. }
  1101. // All services but one were deleted. While you'd expect only the ClusterIPs
  1102. // from the three deleted services here, we still have the ClusterIP for
  1103. // the not-deleted service, because one of it's ServicePorts was deleted.
  1104. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
  1105. if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
  1106. t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList())
  1107. }
  1108. for _, ip := range expectedStaleUDPServices {
  1109. if !result.UDPStaleClusterIP.Has(ip) {
  1110. t.Errorf("expected stale UDP service service %s", ip)
  1111. }
  1112. }
  1113. }
  1114. func TestBuildServiceMapServiceHeadless(t *testing.T) {
  1115. ipt := iptablestest.NewFake()
  1116. fp := NewFakeProxier(ipt, false)
  1117. makeServiceMap(fp,
  1118. makeTestService("somewhere-else", "headless", func(svc *v1.Service) {
  1119. svc.Spec.Type = v1.ServiceTypeClusterIP
  1120. svc.Spec.ClusterIP = v1.ClusterIPNone
  1121. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
  1122. }),
  1123. makeTestService("somewhere-else", "headless-without-port", func(svc *v1.Service) {
  1124. svc.Spec.Type = v1.ServiceTypeClusterIP
  1125. svc.Spec.ClusterIP = v1.ClusterIPNone
  1126. }),
  1127. )
  1128. // Headless service should be ignored
  1129. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1130. if len(fp.serviceMap) != 0 {
  1131. t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
  1132. }
  1133. // No proxied services, so no healthchecks
  1134. if len(result.HCServiceNodePorts) != 0 {
  1135. t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
  1136. }
  1137. if len(result.UDPStaleClusterIP) != 0 {
  1138. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1139. }
  1140. }
  1141. func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
  1142. ipt := iptablestest.NewFake()
  1143. fp := NewFakeProxier(ipt, false)
  1144. makeServiceMap(fp,
  1145. makeTestService("somewhere-else", "external-name", func(svc *v1.Service) {
  1146. svc.Spec.Type = v1.ServiceTypeExternalName
  1147. svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
  1148. svc.Spec.ExternalName = "foo2.bar.com"
  1149. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
  1150. }),
  1151. )
  1152. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1153. if len(fp.serviceMap) != 0 {
  1154. t.Errorf("expected service map length 0, got %v", fp.serviceMap)
  1155. }
  1156. // No proxied services, so no healthchecks
  1157. if len(result.HCServiceNodePorts) != 0 {
  1158. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1159. }
  1160. if len(result.UDPStaleClusterIP) != 0 {
  1161. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
  1162. }
  1163. }
  1164. func TestBuildServiceMapServiceUpdate(t *testing.T) {
  1165. ipt := iptablestest.NewFake()
  1166. fp := NewFakeProxier(ipt, false)
  1167. servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
  1168. svc.Spec.Type = v1.ServiceTypeClusterIP
  1169. svc.Spec.ClusterIP = "172.16.55.4"
  1170. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
  1171. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
  1172. })
  1173. servicev2 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
  1174. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1175. svc.Spec.ClusterIP = "172.16.55.4"
  1176. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1177. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
  1178. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
  1179. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1180. Ingress: []v1.LoadBalancerIngress{
  1181. {IP: "10.1.2.3"},
  1182. },
  1183. }
  1184. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  1185. svc.Spec.HealthCheckNodePort = 345
  1186. })
  1187. fp.OnServiceAdd(servicev1)
  1188. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1189. if len(fp.serviceMap) != 2 {
  1190. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1191. }
  1192. if len(result.HCServiceNodePorts) != 0 {
  1193. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1194. }
  1195. if len(result.UDPStaleClusterIP) != 0 {
  1196. // Services only added, so nothing stale yet
  1197. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1198. }
  1199. // Change service to load-balancer
  1200. fp.OnServiceUpdate(servicev1, servicev2)
  1201. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1202. if len(fp.serviceMap) != 2 {
  1203. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1204. }
  1205. if len(result.HCServiceNodePorts) != 1 {
  1206. t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
  1207. }
  1208. if len(result.UDPStaleClusterIP) != 0 {
  1209. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
  1210. }
  1211. // No change; make sure the service map stays the same and there are
  1212. // no health-check changes
  1213. fp.OnServiceUpdate(servicev2, servicev2)
  1214. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1215. if len(fp.serviceMap) != 2 {
  1216. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1217. }
  1218. if len(result.HCServiceNodePorts) != 1 {
  1219. t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
  1220. }
  1221. if len(result.UDPStaleClusterIP) != 0 {
  1222. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
  1223. }
  1224. // And back to ClusterIP
  1225. fp.OnServiceUpdate(servicev2, servicev1)
  1226. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1227. if len(fp.serviceMap) != 2 {
  1228. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1229. }
  1230. if len(result.HCServiceNodePorts) != 0 {
  1231. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1232. }
  1233. if len(result.UDPStaleClusterIP) != 0 {
  1234. // Services only added, so nothing stale yet
  1235. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1236. }
  1237. }
  1238. func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
  1239. ept := &v1.Endpoints{
  1240. ObjectMeta: metav1.ObjectMeta{
  1241. Name: name,
  1242. Namespace: namespace,
  1243. },
  1244. }
  1245. eptFunc(ept)
  1246. return ept
  1247. }
  1248. func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
  1249. for i := range allEndpoints {
  1250. proxier.OnEndpointsAdd(allEndpoints[i])
  1251. }
  1252. proxier.mu.Lock()
  1253. defer proxier.mu.Unlock()
  1254. proxier.endpointsSynced = true
  1255. }
  1256. func makeNSN(namespace, name string) types.NamespacedName {
  1257. return types.NamespacedName{Namespace: namespace, Name: name}
  1258. }
  1259. func makeServicePortName(ns, name, port string, protocol v1.Protocol) proxy.ServicePortName {
  1260. return proxy.ServicePortName{
  1261. NamespacedName: makeNSN(ns, name),
  1262. Port: port,
  1263. Protocol: protocol,
  1264. }
  1265. }
  1266. func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
  1267. for i := range allServices {
  1268. proxier.OnServiceAdd(allServices[i])
  1269. }
  1270. proxier.mu.Lock()
  1271. defer proxier.mu.Unlock()
  1272. proxier.servicesSynced = true
  1273. }
  1274. func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
  1275. if len(newMap) != len(expected) {
  1276. t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
  1277. }
  1278. for x := range expected {
  1279. if len(newMap[x]) != len(expected[x]) {
  1280. t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
  1281. } else {
  1282. for i := range expected[x] {
  1283. newEp, ok := newMap[x][i].(*endpointsInfo)
  1284. if !ok {
  1285. t.Errorf("Failed to cast endpointsInfo")
  1286. continue
  1287. }
  1288. if newEp.Endpoint != expected[x][i].Endpoint ||
  1289. newEp.IsLocal != expected[x][i].IsLocal ||
  1290. newEp.protocol != expected[x][i].protocol ||
  1291. newEp.chainName != expected[x][i].chainName {
  1292. t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
  1293. }
  1294. }
  1295. }
  1296. }
  1297. }
  1298. func Test_updateEndpointsMap(t *testing.T) {
  1299. var nodeName = testHostname
  1300. emptyEndpoint := func(ept *v1.Endpoints) {
  1301. ept.Subsets = []v1.EndpointSubset{}
  1302. }
  1303. unnamedPort := func(ept *v1.Endpoints) {
  1304. ept.Subsets = []v1.EndpointSubset{{
  1305. Addresses: []v1.EndpointAddress{{
  1306. IP: "1.1.1.1",
  1307. }},
  1308. Ports: []v1.EndpointPort{{
  1309. Port: 11,
  1310. Protocol: v1.ProtocolUDP,
  1311. }},
  1312. }}
  1313. }
  1314. unnamedPortLocal := func(ept *v1.Endpoints) {
  1315. ept.Subsets = []v1.EndpointSubset{{
  1316. Addresses: []v1.EndpointAddress{{
  1317. IP: "1.1.1.1",
  1318. NodeName: &nodeName,
  1319. }},
  1320. Ports: []v1.EndpointPort{{
  1321. Port: 11,
  1322. Protocol: v1.ProtocolUDP,
  1323. }},
  1324. }}
  1325. }
  1326. namedPortLocal := func(ept *v1.Endpoints) {
  1327. ept.Subsets = []v1.EndpointSubset{{
  1328. Addresses: []v1.EndpointAddress{{
  1329. IP: "1.1.1.1",
  1330. NodeName: &nodeName,
  1331. }},
  1332. Ports: []v1.EndpointPort{{
  1333. Name: "p11",
  1334. Port: 11,
  1335. Protocol: v1.ProtocolUDP,
  1336. }},
  1337. }}
  1338. }
  1339. namedPort := func(ept *v1.Endpoints) {
  1340. ept.Subsets = []v1.EndpointSubset{{
  1341. Addresses: []v1.EndpointAddress{{
  1342. IP: "1.1.1.1",
  1343. }},
  1344. Ports: []v1.EndpointPort{{
  1345. Name: "p11",
  1346. Port: 11,
  1347. Protocol: v1.ProtocolUDP,
  1348. }},
  1349. }}
  1350. }
  1351. namedPortRenamed := func(ept *v1.Endpoints) {
  1352. ept.Subsets = []v1.EndpointSubset{{
  1353. Addresses: []v1.EndpointAddress{{
  1354. IP: "1.1.1.1",
  1355. }},
  1356. Ports: []v1.EndpointPort{{
  1357. Name: "p11-2",
  1358. Port: 11,
  1359. Protocol: v1.ProtocolUDP,
  1360. }},
  1361. }}
  1362. }
  1363. namedPortRenumbered := func(ept *v1.Endpoints) {
  1364. ept.Subsets = []v1.EndpointSubset{{
  1365. Addresses: []v1.EndpointAddress{{
  1366. IP: "1.1.1.1",
  1367. }},
  1368. Ports: []v1.EndpointPort{{
  1369. Name: "p11",
  1370. Port: 22,
  1371. Protocol: v1.ProtocolUDP,
  1372. }},
  1373. }}
  1374. }
  1375. namedPortsLocalNoLocal := func(ept *v1.Endpoints) {
  1376. ept.Subsets = []v1.EndpointSubset{{
  1377. Addresses: []v1.EndpointAddress{{
  1378. IP: "1.1.1.1",
  1379. }, {
  1380. IP: "1.1.1.2",
  1381. NodeName: &nodeName,
  1382. }},
  1383. Ports: []v1.EndpointPort{{
  1384. Name: "p11",
  1385. Port: 11,
  1386. Protocol: v1.ProtocolUDP,
  1387. }, {
  1388. Name: "p12",
  1389. Port: 12,
  1390. Protocol: v1.ProtocolUDP,
  1391. }},
  1392. }}
  1393. }
  1394. multipleSubsets := func(ept *v1.Endpoints) {
  1395. ept.Subsets = []v1.EndpointSubset{{
  1396. Addresses: []v1.EndpointAddress{{
  1397. IP: "1.1.1.1",
  1398. }},
  1399. Ports: []v1.EndpointPort{{
  1400. Name: "p11",
  1401. Port: 11,
  1402. Protocol: v1.ProtocolUDP,
  1403. }},
  1404. }, {
  1405. Addresses: []v1.EndpointAddress{{
  1406. IP: "1.1.1.2",
  1407. }},
  1408. Ports: []v1.EndpointPort{{
  1409. Name: "p12",
  1410. Port: 12,
  1411. Protocol: v1.ProtocolUDP,
  1412. }},
  1413. }}
  1414. }
  1415. multipleSubsetsWithLocal := func(ept *v1.Endpoints) {
  1416. ept.Subsets = []v1.EndpointSubset{{
  1417. Addresses: []v1.EndpointAddress{{
  1418. IP: "1.1.1.1",
  1419. }},
  1420. Ports: []v1.EndpointPort{{
  1421. Name: "p11",
  1422. Port: 11,
  1423. Protocol: v1.ProtocolUDP,
  1424. }},
  1425. }, {
  1426. Addresses: []v1.EndpointAddress{{
  1427. IP: "1.1.1.2",
  1428. NodeName: &nodeName,
  1429. }},
  1430. Ports: []v1.EndpointPort{{
  1431. Name: "p12",
  1432. Port: 12,
  1433. Protocol: v1.ProtocolUDP,
  1434. }},
  1435. }}
  1436. }
  1437. multipleSubsetsMultiplePortsLocal := func(ept *v1.Endpoints) {
  1438. ept.Subsets = []v1.EndpointSubset{{
  1439. Addresses: []v1.EndpointAddress{{
  1440. IP: "1.1.1.1",
  1441. NodeName: &nodeName,
  1442. }},
  1443. Ports: []v1.EndpointPort{{
  1444. Name: "p11",
  1445. Port: 11,
  1446. Protocol: v1.ProtocolUDP,
  1447. }, {
  1448. Name: "p12",
  1449. Port: 12,
  1450. Protocol: v1.ProtocolUDP,
  1451. }},
  1452. }, {
  1453. Addresses: []v1.EndpointAddress{{
  1454. IP: "1.1.1.3",
  1455. }},
  1456. Ports: []v1.EndpointPort{{
  1457. Name: "p13",
  1458. Port: 13,
  1459. Protocol: v1.ProtocolUDP,
  1460. }},
  1461. }}
  1462. }
  1463. multipleSubsetsIPsPorts1 := func(ept *v1.Endpoints) {
  1464. ept.Subsets = []v1.EndpointSubset{{
  1465. Addresses: []v1.EndpointAddress{{
  1466. IP: "1.1.1.1",
  1467. }, {
  1468. IP: "1.1.1.2",
  1469. NodeName: &nodeName,
  1470. }},
  1471. Ports: []v1.EndpointPort{{
  1472. Name: "p11",
  1473. Port: 11,
  1474. Protocol: v1.ProtocolUDP,
  1475. }, {
  1476. Name: "p12",
  1477. Port: 12,
  1478. Protocol: v1.ProtocolUDP,
  1479. }},
  1480. }, {
  1481. Addresses: []v1.EndpointAddress{{
  1482. IP: "1.1.1.3",
  1483. }, {
  1484. IP: "1.1.1.4",
  1485. NodeName: &nodeName,
  1486. }},
  1487. Ports: []v1.EndpointPort{{
  1488. Name: "p13",
  1489. Port: 13,
  1490. Protocol: v1.ProtocolUDP,
  1491. }, {
  1492. Name: "p14",
  1493. Port: 14,
  1494. Protocol: v1.ProtocolUDP,
  1495. }},
  1496. }}
  1497. }
  1498. multipleSubsetsIPsPorts2 := func(ept *v1.Endpoints) {
  1499. ept.Subsets = []v1.EndpointSubset{{
  1500. Addresses: []v1.EndpointAddress{{
  1501. IP: "2.2.2.1",
  1502. }, {
  1503. IP: "2.2.2.2",
  1504. NodeName: &nodeName,
  1505. }},
  1506. Ports: []v1.EndpointPort{{
  1507. Name: "p21",
  1508. Port: 21,
  1509. Protocol: v1.ProtocolUDP,
  1510. }, {
  1511. Name: "p22",
  1512. Port: 22,
  1513. Protocol: v1.ProtocolUDP,
  1514. }},
  1515. }}
  1516. }
  1517. complexBefore1 := func(ept *v1.Endpoints) {
  1518. ept.Subsets = []v1.EndpointSubset{{
  1519. Addresses: []v1.EndpointAddress{{
  1520. IP: "1.1.1.1",
  1521. }},
  1522. Ports: []v1.EndpointPort{{
  1523. Name: "p11",
  1524. Port: 11,
  1525. Protocol: v1.ProtocolUDP,
  1526. }},
  1527. }}
  1528. }
  1529. complexBefore2 := func(ept *v1.Endpoints) {
  1530. ept.Subsets = []v1.EndpointSubset{{
  1531. Addresses: []v1.EndpointAddress{{
  1532. IP: "2.2.2.2",
  1533. NodeName: &nodeName,
  1534. }, {
  1535. IP: "2.2.2.22",
  1536. NodeName: &nodeName,
  1537. }},
  1538. Ports: []v1.EndpointPort{{
  1539. Name: "p22",
  1540. Port: 22,
  1541. Protocol: v1.ProtocolUDP,
  1542. }},
  1543. }, {
  1544. Addresses: []v1.EndpointAddress{{
  1545. IP: "2.2.2.3",
  1546. NodeName: &nodeName,
  1547. }},
  1548. Ports: []v1.EndpointPort{{
  1549. Name: "p23",
  1550. Port: 23,
  1551. Protocol: v1.ProtocolUDP,
  1552. }},
  1553. }}
  1554. }
  1555. complexBefore4 := func(ept *v1.Endpoints) {
  1556. ept.Subsets = []v1.EndpointSubset{{
  1557. Addresses: []v1.EndpointAddress{{
  1558. IP: "4.4.4.4",
  1559. NodeName: &nodeName,
  1560. }, {
  1561. IP: "4.4.4.5",
  1562. NodeName: &nodeName,
  1563. }},
  1564. Ports: []v1.EndpointPort{{
  1565. Name: "p44",
  1566. Port: 44,
  1567. Protocol: v1.ProtocolUDP,
  1568. }},
  1569. }, {
  1570. Addresses: []v1.EndpointAddress{{
  1571. IP: "4.4.4.6",
  1572. NodeName: &nodeName,
  1573. }},
  1574. Ports: []v1.EndpointPort{{
  1575. Name: "p45",
  1576. Port: 45,
  1577. Protocol: v1.ProtocolUDP,
  1578. }},
  1579. }}
  1580. }
  1581. complexAfter1 := func(ept *v1.Endpoints) {
  1582. ept.Subsets = []v1.EndpointSubset{{
  1583. Addresses: []v1.EndpointAddress{{
  1584. IP: "1.1.1.1",
  1585. }, {
  1586. IP: "1.1.1.11",
  1587. }},
  1588. Ports: []v1.EndpointPort{{
  1589. Name: "p11",
  1590. Port: 11,
  1591. Protocol: v1.ProtocolUDP,
  1592. }},
  1593. }, {
  1594. Addresses: []v1.EndpointAddress{{
  1595. IP: "1.1.1.2",
  1596. }},
  1597. Ports: []v1.EndpointPort{{
  1598. Name: "p12",
  1599. Port: 12,
  1600. Protocol: v1.ProtocolUDP,
  1601. }, {
  1602. Name: "p122",
  1603. Port: 122,
  1604. Protocol: v1.ProtocolUDP,
  1605. }},
  1606. }}
  1607. }
  1608. complexAfter3 := func(ept *v1.Endpoints) {
  1609. ept.Subsets = []v1.EndpointSubset{{
  1610. Addresses: []v1.EndpointAddress{{
  1611. IP: "3.3.3.3",
  1612. }},
  1613. Ports: []v1.EndpointPort{{
  1614. Name: "p33",
  1615. Port: 33,
  1616. Protocol: v1.ProtocolUDP,
  1617. }},
  1618. }}
  1619. }
  1620. complexAfter4 := func(ept *v1.Endpoints) {
  1621. ept.Subsets = []v1.EndpointSubset{{
  1622. Addresses: []v1.EndpointAddress{{
  1623. IP: "4.4.4.4",
  1624. NodeName: &nodeName,
  1625. }},
  1626. Ports: []v1.EndpointPort{{
  1627. Name: "p44",
  1628. Port: 44,
  1629. Protocol: v1.ProtocolUDP,
  1630. }},
  1631. }}
  1632. }
  1633. testCases := []struct {
  1634. // previousEndpoints and currentEndpoints are used to call appropriate
  1635. // handlers OnEndpoints* (based on whether corresponding values are nil
  1636. // or non-nil) and must be of equal length.
  1637. previousEndpoints []*v1.Endpoints
  1638. currentEndpoints []*v1.Endpoints
  1639. oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
  1640. expectedResult map[proxy.ServicePortName][]*endpointsInfo
  1641. expectedStaleEndpoints []proxy.ServiceEndpoint
  1642. expectedStaleServiceNames map[proxy.ServicePortName]bool
  1643. expectedHealthchecks map[types.NamespacedName]int
  1644. }{{
  1645. // Case[0]: nothing
  1646. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
  1647. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
  1648. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1649. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1650. expectedHealthchecks: map[types.NamespacedName]int{},
  1651. }, {
  1652. // Case[1]: no change, unnamed port
  1653. previousEndpoints: []*v1.Endpoints{
  1654. makeTestEndpoints("ns1", "ep1", unnamedPort),
  1655. },
  1656. currentEndpoints: []*v1.Endpoints{
  1657. makeTestEndpoints("ns1", "ep1", unnamedPort),
  1658. },
  1659. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1660. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  1661. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1662. },
  1663. },
  1664. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1665. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  1666. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1667. },
  1668. },
  1669. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1670. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1671. expectedHealthchecks: map[types.NamespacedName]int{},
  1672. }, {
  1673. // Case[2]: no change, named port, local
  1674. previousEndpoints: []*v1.Endpoints{
  1675. makeTestEndpoints("ns1", "ep1", namedPortLocal),
  1676. },
  1677. currentEndpoints: []*v1.Endpoints{
  1678. makeTestEndpoints("ns1", "ep1", namedPortLocal),
  1679. },
  1680. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1681. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1682. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1683. },
  1684. },
  1685. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1686. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1687. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1688. },
  1689. },
  1690. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1691. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1692. expectedHealthchecks: map[types.NamespacedName]int{
  1693. makeNSN("ns1", "ep1"): 1,
  1694. },
  1695. }, {
  1696. // Case[3]: no change, multiple subsets
  1697. previousEndpoints: []*v1.Endpoints{
  1698. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  1699. },
  1700. currentEndpoints: []*v1.Endpoints{
  1701. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  1702. },
  1703. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1704. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1705. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1706. },
  1707. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1708. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  1709. },
  1710. },
  1711. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1712. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1713. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1714. },
  1715. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1716. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  1717. },
  1718. },
  1719. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1720. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1721. expectedHealthchecks: map[types.NamespacedName]int{},
  1722. }, {
  1723. // Case[4]: no change, multiple subsets, multiple ports, local
  1724. previousEndpoints: []*v1.Endpoints{
  1725. makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
  1726. },
  1727. currentEndpoints: []*v1.Endpoints{
  1728. makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
  1729. },
  1730. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1731. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1732. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1733. },
  1734. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1735. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: true}},
  1736. },
  1737. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  1738. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1739. },
  1740. },
  1741. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1742. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1743. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1744. },
  1745. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1746. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: true}},
  1747. },
  1748. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  1749. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1750. },
  1751. },
  1752. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1753. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1754. expectedHealthchecks: map[types.NamespacedName]int{
  1755. makeNSN("ns1", "ep1"): 1,
  1756. },
  1757. }, {
  1758. // Case[5]: no change, multiple endpoints, subsets, IPs, and ports
  1759. previousEndpoints: []*v1.Endpoints{
  1760. makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
  1761. makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
  1762. },
  1763. currentEndpoints: []*v1.Endpoints{
  1764. makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
  1765. makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
  1766. },
  1767. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1768. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1769. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1770. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1771. },
  1772. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1773. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1774. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1775. },
  1776. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  1777. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1778. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:13", IsLocal: true}},
  1779. },
  1780. makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): {
  1781. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:14", IsLocal: false}},
  1782. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:14", IsLocal: true}},
  1783. },
  1784. makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): {
  1785. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:21", IsLocal: false}},
  1786. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:21", IsLocal: true}},
  1787. },
  1788. makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
  1789. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:22", IsLocal: false}},
  1790. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
  1791. },
  1792. },
  1793. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1794. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1795. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1796. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1797. },
  1798. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1799. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1800. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1801. },
  1802. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  1803. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1804. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:13", IsLocal: true}},
  1805. },
  1806. makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): {
  1807. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:14", IsLocal: false}},
  1808. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:14", IsLocal: true}},
  1809. },
  1810. makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): {
  1811. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:21", IsLocal: false}},
  1812. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:21", IsLocal: true}},
  1813. },
  1814. makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
  1815. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:22", IsLocal: false}},
  1816. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
  1817. },
  1818. },
  1819. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1820. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1821. expectedHealthchecks: map[types.NamespacedName]int{
  1822. makeNSN("ns1", "ep1"): 2,
  1823. makeNSN("ns2", "ep2"): 1,
  1824. },
  1825. }, {
  1826. // Case[6]: add an Endpoints
  1827. previousEndpoints: []*v1.Endpoints{
  1828. nil,
  1829. },
  1830. currentEndpoints: []*v1.Endpoints{
  1831. makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
  1832. },
  1833. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
  1834. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1835. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  1836. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1837. },
  1838. },
  1839. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1840. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  1841. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
  1842. },
  1843. expectedHealthchecks: map[types.NamespacedName]int{
  1844. makeNSN("ns1", "ep1"): 1,
  1845. },
  1846. }, {
  1847. // Case[7]: remove an Endpoints
  1848. previousEndpoints: []*v1.Endpoints{
  1849. makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
  1850. },
  1851. currentEndpoints: []*v1.Endpoints{
  1852. nil,
  1853. },
  1854. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1855. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  1856. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1857. },
  1858. },
  1859. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
  1860. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1861. Endpoint: "1.1.1.1:11",
  1862. ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
  1863. }},
  1864. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1865. expectedHealthchecks: map[types.NamespacedName]int{},
  1866. }, {
  1867. // Case[8]: add an IP and port
  1868. previousEndpoints: []*v1.Endpoints{
  1869. makeTestEndpoints("ns1", "ep1", namedPort),
  1870. },
  1871. currentEndpoints: []*v1.Endpoints{
  1872. makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
  1873. },
  1874. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1875. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1876. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1877. },
  1878. },
  1879. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1880. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1881. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1882. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1883. },
  1884. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1885. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1886. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1887. },
  1888. },
  1889. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1890. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  1891. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
  1892. },
  1893. expectedHealthchecks: map[types.NamespacedName]int{
  1894. makeNSN("ns1", "ep1"): 1,
  1895. },
  1896. }, {
  1897. // Case[9]: remove an IP and port
  1898. previousEndpoints: []*v1.Endpoints{
  1899. makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
  1900. },
  1901. currentEndpoints: []*v1.Endpoints{
  1902. makeTestEndpoints("ns1", "ep1", namedPort),
  1903. },
  1904. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1905. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1906. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1907. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1908. },
  1909. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1910. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1911. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1912. },
  1913. },
  1914. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1915. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1916. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1917. },
  1918. },
  1919. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1920. Endpoint: "1.1.1.2:11",
  1921. ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
  1922. }, {
  1923. Endpoint: "1.1.1.1:12",
  1924. ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
  1925. }, {
  1926. Endpoint: "1.1.1.2:12",
  1927. ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
  1928. }},
  1929. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1930. expectedHealthchecks: map[types.NamespacedName]int{},
  1931. }, {
  1932. // Case[10]: add a subset
  1933. previousEndpoints: []*v1.Endpoints{
  1934. makeTestEndpoints("ns1", "ep1", namedPort),
  1935. },
  1936. currentEndpoints: []*v1.Endpoints{
  1937. makeTestEndpoints("ns1", "ep1", multipleSubsetsWithLocal),
  1938. },
  1939. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1940. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1941. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1942. },
  1943. },
  1944. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1945. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1946. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1947. },
  1948. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1949. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1950. },
  1951. },
  1952. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1953. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  1954. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
  1955. },
  1956. expectedHealthchecks: map[types.NamespacedName]int{
  1957. makeNSN("ns1", "ep1"): 1,
  1958. },
  1959. }, {
  1960. // Case[11]: remove a subset
  1961. previousEndpoints: []*v1.Endpoints{
  1962. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  1963. },
  1964. currentEndpoints: []*v1.Endpoints{
  1965. makeTestEndpoints("ns1", "ep1", namedPort),
  1966. },
  1967. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1968. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1969. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1970. },
  1971. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  1972. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  1973. },
  1974. },
  1975. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1976. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1977. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1978. },
  1979. },
  1980. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1981. Endpoint: "1.1.1.2:12",
  1982. ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
  1983. }},
  1984. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1985. expectedHealthchecks: map[types.NamespacedName]int{},
  1986. }, {
  1987. // Case[12]: rename a port
  1988. previousEndpoints: []*v1.Endpoints{
  1989. makeTestEndpoints("ns1", "ep1", namedPort),
  1990. },
  1991. currentEndpoints: []*v1.Endpoints{
  1992. makeTestEndpoints("ns1", "ep1", namedPortRenamed),
  1993. },
  1994. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1995. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  1996. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1997. },
  1998. },
  1999. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  2000. makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): {
  2001. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2002. },
  2003. },
  2004. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2005. Endpoint: "1.1.1.1:11",
  2006. ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
  2007. }},
  2008. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2009. makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
  2010. },
  2011. expectedHealthchecks: map[types.NamespacedName]int{},
  2012. }, {
  2013. // Case[13]: renumber a port
  2014. previousEndpoints: []*v1.Endpoints{
  2015. makeTestEndpoints("ns1", "ep1", namedPort),
  2016. },
  2017. currentEndpoints: []*v1.Endpoints{
  2018. makeTestEndpoints("ns1", "ep1", namedPortRenumbered),
  2019. },
  2020. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  2021. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2022. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2023. },
  2024. },
  2025. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  2026. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2027. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:22", IsLocal: false}},
  2028. },
  2029. },
  2030. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2031. Endpoint: "1.1.1.1:11",
  2032. ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
  2033. }},
  2034. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2035. expectedHealthchecks: map[types.NamespacedName]int{},
  2036. }, {
  2037. // Case[14]: complex add and remove
  2038. previousEndpoints: []*v1.Endpoints{
  2039. makeTestEndpoints("ns1", "ep1", complexBefore1),
  2040. makeTestEndpoints("ns2", "ep2", complexBefore2),
  2041. nil,
  2042. makeTestEndpoints("ns4", "ep4", complexBefore4),
  2043. },
  2044. currentEndpoints: []*v1.Endpoints{
  2045. makeTestEndpoints("ns1", "ep1", complexAfter1),
  2046. nil,
  2047. makeTestEndpoints("ns3", "ep3", complexAfter3),
  2048. makeTestEndpoints("ns4", "ep4", complexAfter4),
  2049. },
  2050. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  2051. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2052. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2053. },
  2054. makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
  2055. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
  2056. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.22:22", IsLocal: true}},
  2057. },
  2058. makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP): {
  2059. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.3:23", IsLocal: true}},
  2060. },
  2061. makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): {
  2062. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.4:44", IsLocal: true}},
  2063. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.5:44", IsLocal: true}},
  2064. },
  2065. makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP): {
  2066. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.6:45", IsLocal: true}},
  2067. },
  2068. },
  2069. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  2070. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2071. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2072. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.11:11", IsLocal: false}},
  2073. },
  2074. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2075. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  2076. },
  2077. makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): {
  2078. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:122", IsLocal: false}},
  2079. },
  2080. makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): {
  2081. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "3.3.3.3:33", IsLocal: false}},
  2082. },
  2083. makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): {
  2084. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.4:44", IsLocal: true}},
  2085. },
  2086. },
  2087. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2088. Endpoint: "2.2.2.2:22",
  2089. ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
  2090. }, {
  2091. Endpoint: "2.2.2.22:22",
  2092. ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
  2093. }, {
  2094. Endpoint: "2.2.2.3:23",
  2095. ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP),
  2096. }, {
  2097. Endpoint: "4.4.4.5:44",
  2098. ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP),
  2099. }, {
  2100. Endpoint: "4.4.4.6:45",
  2101. ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
  2102. }},
  2103. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2104. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
  2105. makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
  2106. makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
  2107. },
  2108. expectedHealthchecks: map[types.NamespacedName]int{
  2109. makeNSN("ns4", "ep4"): 1,
  2110. },
  2111. }, {
  2112. // Case[15]: change from 0 endpoint address to 1 unnamed port
  2113. previousEndpoints: []*v1.Endpoints{
  2114. makeTestEndpoints("ns1", "ep1", emptyEndpoint),
  2115. },
  2116. currentEndpoints: []*v1.Endpoints{
  2117. makeTestEndpoints("ns1", "ep1", unnamedPort),
  2118. },
  2119. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
  2120. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  2121. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  2122. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2123. },
  2124. },
  2125. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2126. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2127. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
  2128. },
  2129. expectedHealthchecks: map[types.NamespacedName]int{},
  2130. },
  2131. }
  2132. for tci, tc := range testCases {
  2133. ipt := iptablestest.NewFake()
  2134. fp := NewFakeProxier(ipt, false)
  2135. fp.hostname = nodeName
  2136. // First check that after adding all previous versions of endpoints,
  2137. // the fp.oldEndpoints is as we expect.
  2138. for i := range tc.previousEndpoints {
  2139. if tc.previousEndpoints[i] != nil {
  2140. fp.OnEndpointsAdd(tc.previousEndpoints[i])
  2141. }
  2142. }
  2143. fp.endpointsMap.Update(fp.endpointsChanges)
  2144. compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
  2145. // Now let's call appropriate handlers to get to state we want to be.
  2146. if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
  2147. t.Fatalf("[%d] different lengths of previous and current endpoints", tci)
  2148. continue
  2149. }
  2150. for i := range tc.previousEndpoints {
  2151. prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i]
  2152. switch {
  2153. case prev == nil:
  2154. fp.OnEndpointsAdd(curr)
  2155. case curr == nil:
  2156. fp.OnEndpointsDelete(prev)
  2157. default:
  2158. fp.OnEndpointsUpdate(prev, curr)
  2159. }
  2160. }
  2161. result := fp.endpointsMap.Update(fp.endpointsChanges)
  2162. newMap := fp.endpointsMap
  2163. compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
  2164. if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
  2165. t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
  2166. }
  2167. for _, x := range tc.expectedStaleEndpoints {
  2168. found := false
  2169. for _, stale := range result.StaleEndpoints {
  2170. if stale == x {
  2171. found = true
  2172. break
  2173. }
  2174. }
  2175. if !found {
  2176. t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints)
  2177. }
  2178. }
  2179. if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) {
  2180. t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames)
  2181. }
  2182. for svcName := range tc.expectedStaleServiceNames {
  2183. found := false
  2184. for _, stale := range result.StaleServiceNames {
  2185. if stale == svcName {
  2186. found = true
  2187. }
  2188. }
  2189. if !found {
  2190. t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
  2191. }
  2192. }
  2193. if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
  2194. t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
  2195. }
  2196. }
  2197. }
  2198. // The majority of EndpointSlice specific tests are not iptables specific and focus on
  2199. // the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the
  2200. // iptables proxier supports translating EndpointSlices to iptables output.
  2201. func TestEndpointSliceE2E(t *testing.T) {
  2202. expectedIPTablesWithSlice := `*filter
  2203. :KUBE-SERVICES - [0:0]
  2204. :KUBE-EXTERNAL-SERVICES - [0:0]
  2205. :KUBE-FORWARD - [0:0]
  2206. -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
  2207. -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark -j ACCEPT
  2208. -A KUBE-FORWARD -s 10.0.0.0/24 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
  2209. -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -d 10.0.0.0/24 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
  2210. COMMIT
  2211. *nat
  2212. :KUBE-SERVICES - [0:0]
  2213. :KUBE-NODEPORTS - [0:0]
  2214. :KUBE-POSTROUTING - [0:0]
  2215. :KUBE-MARK-MASQ - [0:0]
  2216. :KUBE-SVC-AHZNAGK3SCETOS2T - [0:0]
  2217. :KUBE-SEP-PXD6POUVGD2I37UY - [0:0]
  2218. :KUBE-SEP-SOKZUIT7SCEVIP33 - [0:0]
  2219. :KUBE-SEP-WVE3FAB34S7NZGDJ - [0:0]
  2220. -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark -j MASQUERADE
  2221. -A KUBE-MARK-MASQ -j MARK --set-xmark
  2222. -A KUBE-SERVICES -m comment --comment "ns1/svc1: cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
  2223. -A KUBE-SERVICES -m comment --comment "ns1/svc1: cluster IP" -m tcp -p tcp -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-AHZNAGK3SCETOS2T
  2224. -A KUBE-SVC-AHZNAGK3SCETOS2T -m comment --comment ns1/svc1: -m statistic --mode random --probability 0.3333333333 -j KUBE-SEP-PXD6POUVGD2I37UY
  2225. -A KUBE-SEP-PXD6POUVGD2I37UY -m comment --comment ns1/svc1: -s 10.0.1.1/32 -j KUBE-MARK-MASQ
  2226. -A KUBE-SEP-PXD6POUVGD2I37UY -m comment --comment ns1/svc1: -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
  2227. -A KUBE-SVC-AHZNAGK3SCETOS2T -m comment --comment ns1/svc1: -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-SOKZUIT7SCEVIP33
  2228. -A KUBE-SEP-SOKZUIT7SCEVIP33 -m comment --comment ns1/svc1: -s 10.0.1.2/32 -j KUBE-MARK-MASQ
  2229. -A KUBE-SEP-SOKZUIT7SCEVIP33 -m comment --comment ns1/svc1: -m tcp -p tcp -j DNAT --to-destination 10.0.1.2:80
  2230. -A KUBE-SVC-AHZNAGK3SCETOS2T -m comment --comment ns1/svc1: -j KUBE-SEP-WVE3FAB34S7NZGDJ
  2231. -A KUBE-SEP-WVE3FAB34S7NZGDJ -m comment --comment ns1/svc1: -s 10.0.1.3/32 -j KUBE-MARK-MASQ
  2232. -A KUBE-SEP-WVE3FAB34S7NZGDJ -m comment --comment ns1/svc1: -m tcp -p tcp -j DNAT --to-destination 10.0.1.3:80
  2233. -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
  2234. COMMIT
  2235. `
  2236. ipt := iptablestest.NewFake()
  2237. fp := NewFakeProxier(ipt, true)
  2238. fp.OnServiceSynced()
  2239. fp.OnEndpointsSynced()
  2240. fp.OnEndpointSlicesSynced()
  2241. serviceName := "svc1"
  2242. namespaceName := "ns1"
  2243. fp.OnServiceAdd(&v1.Service{
  2244. ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
  2245. Spec: v1.ServiceSpec{
  2246. ClusterIP: "172.20.1.1",
  2247. Selector: map[string]string{"foo": "bar"},
  2248. Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}},
  2249. },
  2250. })
  2251. tcpProtocol := v1.ProtocolTCP
  2252. endpointSlice := &discovery.EndpointSlice{
  2253. ObjectMeta: metav1.ObjectMeta{
  2254. Name: fmt.Sprintf("%s-1", serviceName),
  2255. Namespace: namespaceName,
  2256. Labels: map[string]string{discovery.LabelServiceName: serviceName},
  2257. },
  2258. Ports: []discovery.EndpointPort{{
  2259. Name: utilpointer.StringPtr(""),
  2260. Port: utilpointer.Int32Ptr(80),
  2261. Protocol: &tcpProtocol,
  2262. }},
  2263. AddressType: discovery.AddressTypeIPv4,
  2264. Endpoints: []discovery.Endpoint{{
  2265. Addresses: []string{"10.0.1.1"},
  2266. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  2267. Topology: map[string]string{"kubernetes.io/hostname": testHostname},
  2268. }, {
  2269. Addresses: []string{"10.0.1.2"},
  2270. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  2271. Topology: map[string]string{"kubernetes.io/hostname": "node2"},
  2272. }, {
  2273. Addresses: []string{"10.0.1.3"},
  2274. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  2275. Topology: map[string]string{"kubernetes.io/hostname": "node3"},
  2276. }},
  2277. }
  2278. fp.OnEndpointSliceAdd(endpointSlice)
  2279. fp.syncProxyRules()
  2280. assert.Equal(t, expectedIPTablesWithSlice, fp.iptablesData.String())
  2281. fp.OnEndpointSliceDelete(endpointSlice)
  2282. fp.syncProxyRules()
  2283. assert.NotEqual(t, expectedIPTablesWithSlice, fp.iptablesData.String())
  2284. }
  2285. // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.