proxier_test.go 75 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package iptables
  14. import (
  15. "bytes"
  16. "fmt"
  17. "net"
  18. "reflect"
  19. "strconv"
  20. "strings"
  21. "testing"
  22. "time"
  23. "k8s.io/klog"
  24. "k8s.io/api/core/v1"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/types"
  27. "k8s.io/apimachinery/pkg/util/intstr"
  28. "k8s.io/kubernetes/pkg/proxy"
  29. utilproxy "k8s.io/kubernetes/pkg/proxy/util"
  30. utilproxytest "k8s.io/kubernetes/pkg/proxy/util/testing"
  31. "k8s.io/kubernetes/pkg/util/async"
  32. "k8s.io/kubernetes/pkg/util/conntrack"
  33. utiliptables "k8s.io/kubernetes/pkg/util/iptables"
  34. iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
  35. "k8s.io/utils/exec"
  36. fakeexec "k8s.io/utils/exec/testing"
  37. utilpointer "k8s.io/utils/pointer"
  38. )
  39. func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
  40. chainLines := utiliptables.GetChainLines(table, save)
  41. for chain, lineBytes := range chainLines {
  42. line := string(lineBytes)
  43. if expected, exists := expectedLines[chain]; exists {
  44. if expected != line {
  45. t.Errorf("getChainLines expected chain line not present. For chain: %s Expected: %s Got: %s", chain, expected, line)
  46. }
  47. } else {
  48. t.Errorf("getChainLines expected chain not present: %s", chain)
  49. }
  50. }
  51. }
  52. func TestGetChainLines(t *testing.T) {
  53. iptablesSave := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014
  54. *nat
  55. :PREROUTING ACCEPT [2136997:197881818]
  56. :POSTROUTING ACCEPT [4284525:258542680]
  57. :OUTPUT ACCEPT [5901660:357267963]
  58. -A PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
  59. COMMIT
  60. # Completed on Wed Oct 29 14:56:01 2014`
  61. expected := map[utiliptables.Chain]string{
  62. utiliptables.ChainPrerouting: ":PREROUTING ACCEPT [2136997:197881818]",
  63. utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [4284525:258542680]",
  64. utiliptables.ChainOutput: ":OUTPUT ACCEPT [5901660:357267963]",
  65. }
  66. checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
  67. }
  68. func TestGetChainLinesMultipleTables(t *testing.T) {
  69. iptablesSave := `# Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
  70. *nat
  71. :PREROUTING ACCEPT [2:138]
  72. :INPUT ACCEPT [0:0]
  73. :OUTPUT ACCEPT [0:0]
  74. :POSTROUTING ACCEPT [0:0]
  75. :DOCKER - [0:0]
  76. :KUBE-NODEPORT-CONTAINER - [0:0]
  77. :KUBE-NODEPORT-HOST - [0:0]
  78. :KUBE-PORTALS-CONTAINER - [0:0]
  79. :KUBE-PORTALS-HOST - [0:0]
  80. :KUBE-SVC-1111111111111111 - [0:0]
  81. :KUBE-SVC-2222222222222222 - [0:0]
  82. :KUBE-SVC-3333333333333333 - [0:0]
  83. :KUBE-SVC-4444444444444444 - [0:0]
  84. :KUBE-SVC-5555555555555555 - [0:0]
  85. :KUBE-SVC-6666666666666666 - [0:0]
  86. -A PREROUTING -m comment --comment "handle ClusterIPs; NOTE: this must be before the NodePort rules" -j KUBE-PORTALS-CONTAINER
  87. -A PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
  88. -A PREROUTING -m addrtype --dst-type LOCAL -m comment --comment "handle service NodePorts; NOTE: this must be the last rule in the chain" -j KUBE-NODEPORT-CONTAINER
  89. -A OUTPUT -m comment --comment "handle ClusterIPs; NOTE: this must be before the NodePort rules" -j KUBE-PORTALS-HOST
  90. -A OUTPUT ! -d 127.0.0.0/8 -m addrtype --dst-type LOCAL -j DOCKER
  91. -A OUTPUT -m addrtype --dst-type LOCAL -m comment --comment "handle service NodePorts; NOTE: this must be the last rule in the chain" -j KUBE-NODEPORT-HOST
  92. -A POSTROUTING -s 10.246.1.0/24 ! -o cbr0 -j MASQUERADE
  93. -A POSTROUTING -s 10.0.2.15/32 -d 10.0.2.15/32 -m comment --comment "handle pod connecting to self" -j MASQUERADE
  94. -A KUBE-PORTALS-CONTAINER -d 10.247.0.1/32 -p tcp -m comment --comment "portal for default/kubernetes:" -m state --state NEW -m tcp --dport 443 -j KUBE-SVC-5555555555555555
  95. -A KUBE-PORTALS-CONTAINER -d 10.247.0.10/32 -p udp -m comment --comment "portal for kube-system/kube-dns:dns" -m state --state NEW -m udp --dport 53 -j KUBE-SVC-6666666666666666
  96. -A KUBE-PORTALS-CONTAINER -d 10.247.0.10/32 -p tcp -m comment --comment "portal for kube-system/kube-dns:dns-tcp" -m state --state NEW -m tcp --dport 53 -j KUBE-SVC-2222222222222222
  97. -A KUBE-PORTALS-HOST -d 10.247.0.1/32 -p tcp -m comment --comment "portal for default/kubernetes:" -m state --state NEW -m tcp --dport 443 -j KUBE-SVC-5555555555555555
  98. -A KUBE-PORTALS-HOST -d 10.247.0.10/32 -p udp -m comment --comment "portal for kube-system/kube-dns:dns" -m state --state NEW -m udp --dport 53 -j KUBE-SVC-6666666666666666
  99. -A KUBE-PORTALS-HOST -d 10.247.0.10/32 -p tcp -m comment --comment "portal for kube-system/kube-dns:dns-tcp" -m state --state NEW -m tcp --dport 53 -j KUBE-SVC-2222222222222222
  100. -A KUBE-SVC-1111111111111111 -p udp -m comment --comment "kube-system/kube-dns:dns" -m recent --set --name KUBE-SVC-1111111111111111 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.246.1.2:53
  101. -A KUBE-SVC-2222222222222222 -m comment --comment "kube-system/kube-dns:dns-tcp" -j KUBE-SVC-3333333333333333
  102. -A KUBE-SVC-3333333333333333 -p tcp -m comment --comment "kube-system/kube-dns:dns-tcp" -m recent --set --name KUBE-SVC-3333333333333333 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.246.1.2:53
  103. -A KUBE-SVC-4444444444444444 -p tcp -m comment --comment "default/kubernetes:" -m recent --set --name KUBE-SVC-4444444444444444 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.245.1.2:443
  104. -A KUBE-SVC-5555555555555555 -m comment --comment "default/kubernetes:" -j KUBE-SVC-4444444444444444
  105. -A KUBE-SVC-6666666666666666 -m comment --comment "kube-system/kube-dns:dns" -j KUBE-SVC-1111111111111111
  106. COMMIT
  107. # Completed on Fri Aug 7 14:47:37 2015
  108. # Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
  109. *filter
  110. :INPUT ACCEPT [17514:83115836]
  111. :FORWARD ACCEPT [0:0]
  112. :OUTPUT ACCEPT [8909:688225]
  113. :DOCKER - [0:0]
  114. -A FORWARD -o cbr0 -j DOCKER
  115. -A FORWARD -o cbr0 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
  116. -A FORWARD -i cbr0 ! -o cbr0 -j ACCEPT
  117. -A FORWARD -i cbr0 -o cbr0 -j ACCEPT
  118. COMMIT
  119. `
  120. expected := map[utiliptables.Chain]string{
  121. utiliptables.ChainPrerouting: ":PREROUTING ACCEPT [2:138]",
  122. utiliptables.Chain("INPUT"): ":INPUT ACCEPT [0:0]",
  123. utiliptables.Chain("OUTPUT"): ":OUTPUT ACCEPT [0:0]",
  124. utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [0:0]",
  125. utiliptables.Chain("DOCKER"): ":DOCKER - [0:0]",
  126. utiliptables.Chain("KUBE-NODEPORT-CONTAINER"): ":KUBE-NODEPORT-CONTAINER - [0:0]",
  127. utiliptables.Chain("KUBE-NODEPORT-HOST"): ":KUBE-NODEPORT-HOST - [0:0]",
  128. utiliptables.Chain("KUBE-PORTALS-CONTAINER"): ":KUBE-PORTALS-CONTAINER - [0:0]",
  129. utiliptables.Chain("KUBE-PORTALS-HOST"): ":KUBE-PORTALS-HOST - [0:0]",
  130. utiliptables.Chain("KUBE-SVC-1111111111111111"): ":KUBE-SVC-1111111111111111 - [0:0]",
  131. utiliptables.Chain("KUBE-SVC-2222222222222222"): ":KUBE-SVC-2222222222222222 - [0:0]",
  132. utiliptables.Chain("KUBE-SVC-3333333333333333"): ":KUBE-SVC-3333333333333333 - [0:0]",
  133. utiliptables.Chain("KUBE-SVC-4444444444444444"): ":KUBE-SVC-4444444444444444 - [0:0]",
  134. utiliptables.Chain("KUBE-SVC-5555555555555555"): ":KUBE-SVC-5555555555555555 - [0:0]",
  135. utiliptables.Chain("KUBE-SVC-6666666666666666"): ":KUBE-SVC-6666666666666666 - [0:0]",
  136. }
  137. checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
  138. }
  139. func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol v1.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo {
  140. return &serviceInfo{
  141. BaseServiceInfo: &proxy.BaseServiceInfo{
  142. SessionAffinityType: v1.ServiceAffinityNone, // default
  143. StickyMaxAgeSeconds: int(v1.DefaultClientIPServiceAffinitySeconds), // default
  144. ClusterIP: ip,
  145. Port: port,
  146. Protocol: protocol,
  147. OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
  148. },
  149. }
  150. }
  151. func TestDeleteEndpointConnections(t *testing.T) {
  152. const (
  153. UDP = v1.ProtocolUDP
  154. TCP = v1.ProtocolTCP
  155. SCTP = v1.ProtocolSCTP
  156. )
  157. testCases := []struct {
  158. description string
  159. svcName string
  160. svcIP string
  161. svcPort int
  162. protocol v1.Protocol
  163. endpoint string // IP:port endpoint
  164. epSvcPair proxy.ServiceEndpoint // Will be generated by test
  165. simulatedErr string
  166. }{
  167. {
  168. description: "V4 UDP",
  169. svcName: "v4-udp",
  170. svcIP: "10.96.1.1",
  171. svcPort: 80,
  172. protocol: UDP,
  173. endpoint: "10.240.0.3:80",
  174. }, {
  175. description: "V4 TCP",
  176. svcName: "v4-tcp",
  177. svcIP: "10.96.2.2",
  178. svcPort: 80,
  179. protocol: TCP,
  180. endpoint: "10.240.0.4:80",
  181. }, {
  182. description: "V4 SCTP",
  183. svcName: "v4-sctp",
  184. svcIP: "10.96.3.3",
  185. svcPort: 80,
  186. protocol: SCTP,
  187. endpoint: "10.240.0.5:80",
  188. }, {
  189. description: "V4 UDP, nothing to delete, benign error",
  190. svcName: "v4-udp-nothing-to-delete",
  191. svcIP: "10.96.1.1",
  192. svcPort: 80,
  193. protocol: UDP,
  194. endpoint: "10.240.0.3:80",
  195. simulatedErr: conntrack.NoConnectionToDelete,
  196. }, {
  197. description: "V4 UDP, unexpected error, should be glogged",
  198. svcName: "v4-udp-simulated-error",
  199. svcIP: "10.96.1.1",
  200. svcPort: 80,
  201. protocol: UDP,
  202. endpoint: "10.240.0.3:80",
  203. simulatedErr: "simulated error",
  204. }, {
  205. description: "V6 UDP",
  206. svcName: "v6-udp",
  207. svcIP: "fd00:1234::20",
  208. svcPort: 80,
  209. protocol: UDP,
  210. endpoint: "[2001:db8::2]:80",
  211. }, {
  212. description: "V6 TCP",
  213. svcName: "v6-tcp",
  214. svcIP: "fd00:1234::30",
  215. svcPort: 80,
  216. protocol: TCP,
  217. endpoint: "[2001:db8::3]:80",
  218. }, {
  219. description: "V6 SCTP",
  220. svcName: "v6-sctp",
  221. svcIP: "fd00:1234::40",
  222. svcPort: 80,
  223. protocol: SCTP,
  224. endpoint: "[2001:db8::4]:80",
  225. },
  226. }
  227. // Create a service map that has service info entries for all test cases
  228. // and generate an endpoint service pair for each test case
  229. serviceMap := make(map[proxy.ServicePortName]proxy.ServicePort)
  230. for i, tc := range testCases {
  231. svc := proxy.ServicePortName{
  232. NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
  233. Port: "p80",
  234. }
  235. serviceMap[svc] = newFakeServiceInfo(svc, net.ParseIP(tc.svcIP), 80, tc.protocol, false)
  236. testCases[i].epSvcPair = proxy.ServiceEndpoint{
  237. Endpoint: tc.endpoint,
  238. ServicePortName: svc,
  239. }
  240. }
  241. // Create a fake executor for the conntrack utility. This should only be
  242. // invoked for UDP connections, since no conntrack cleanup is needed for TCP
  243. fcmd := fakeexec.FakeCmd{}
  244. fexec := fakeexec.FakeExec{
  245. LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
  246. }
  247. execFunc := func(cmd string, args ...string) exec.Cmd {
  248. return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
  249. }
  250. for _, tc := range testCases {
  251. if tc.protocol == UDP {
  252. var cmdOutput string
  253. var simErr error
  254. if tc.simulatedErr == "" {
  255. cmdOutput = "1 flow entries have been deleted"
  256. } else {
  257. simErr = fmt.Errorf(tc.simulatedErr)
  258. }
  259. cmdFunc := func() ([]byte, error) { return []byte(cmdOutput), simErr }
  260. fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
  261. fexec.CommandScript = append(fexec.CommandScript, execFunc)
  262. }
  263. }
  264. // Create a proxier using the fake conntrack executor and service map
  265. fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap}
  266. // Run the test cases
  267. for _, tc := range testCases {
  268. priorExecs := fexec.CommandCalls
  269. priorGlogErrs := klog.Stats.Error.Lines()
  270. input := []proxy.ServiceEndpoint{tc.epSvcPair}
  271. fakeProxier.deleteEndpointConnections(input)
  272. // For UDP connections, check the executed conntrack command
  273. var expExecs int
  274. if tc.protocol == UDP {
  275. isIPv6 := func(ip string) bool {
  276. netIP := net.ParseIP(ip)
  277. if netIP.To4() == nil {
  278. return true
  279. }
  280. return false
  281. }
  282. endpointIP := utilproxy.IPPart(tc.endpoint)
  283. expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
  284. if isIPv6(endpointIP) {
  285. expectCommand += " -f ipv6"
  286. }
  287. actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ")
  288. if actualCommand != expectCommand {
  289. t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand)
  290. }
  291. expExecs = 1
  292. }
  293. // Check the number of times conntrack was executed
  294. execs := fexec.CommandCalls - priorExecs
  295. if execs != expExecs {
  296. t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs)
  297. }
  298. // Check the number of new glog errors
  299. var expGlogErrs int64
  300. if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
  301. expGlogErrs = 1
  302. }
  303. glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
  304. if glogErrs != expGlogErrs {
  305. t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs)
  306. }
  307. }
  308. }
  309. // fakePortOpener implements portOpener.
  310. type fakePortOpener struct {
  311. openPorts []*utilproxy.LocalPort
  312. }
  313. // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
  314. // to lock a local port.
  315. func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
  316. f.openPorts = append(f.openPorts, lp)
  317. return nil, nil
  318. }
  319. type fakeHealthChecker struct {
  320. services map[types.NamespacedName]uint16
  321. endpoints map[types.NamespacedName]int
  322. }
  323. func newFakeHealthChecker() *fakeHealthChecker {
  324. return &fakeHealthChecker{
  325. services: map[types.NamespacedName]uint16{},
  326. endpoints: map[types.NamespacedName]int{},
  327. }
  328. }
  329. func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error {
  330. fake.services = newServices
  331. return nil
  332. }
  333. func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
  334. fake.endpoints = newEndpoints
  335. return nil
  336. }
  337. const testHostname = "test-hostname"
  338. func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
  339. // TODO: Call NewProxier after refactoring out the goroutine
  340. // invocation into a Run() method.
  341. p := &Proxier{
  342. exec: &fakeexec.FakeExec{},
  343. serviceMap: make(proxy.ServiceMap),
  344. serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
  345. endpointsMap: make(proxy.EndpointsMap),
  346. endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil),
  347. iptables: ipt,
  348. clusterCIDR: "10.0.0.0/24",
  349. hostname: testHostname,
  350. portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
  351. portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
  352. healthChecker: newFakeHealthChecker(),
  353. precomputedProbabilities: make([]string, 0, 1001),
  354. iptablesData: bytes.NewBuffer(nil),
  355. existingFilterChainsData: bytes.NewBuffer(nil),
  356. filterChains: bytes.NewBuffer(nil),
  357. filterRules: bytes.NewBuffer(nil),
  358. natChains: bytes.NewBuffer(nil),
  359. natRules: bytes.NewBuffer(nil),
  360. nodePortAddresses: make([]string, 0),
  361. networkInterfacer: utilproxytest.NewFakeNetwork(),
  362. }
  363. p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
  364. return p
  365. }
  366. func hasSessionAffinityRule(rules []iptablestest.Rule) bool {
  367. for _, r := range rules {
  368. if _, ok := r[iptablestest.Recent]; ok {
  369. return true
  370. }
  371. }
  372. return false
  373. }
  374. func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool {
  375. destPortStr := strconv.Itoa(destPort)
  376. match := false
  377. for _, r := range rules {
  378. if r[iptablestest.Jump] == destChain {
  379. match = true
  380. if destIP != "" {
  381. if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPortStr) || r[iptablestest.DPort] == "") {
  382. return true
  383. }
  384. match = false
  385. }
  386. if destPort != 0 {
  387. if strings.Contains(r[iptablestest.DPort], destPortStr) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") {
  388. return true
  389. }
  390. match = false
  391. }
  392. }
  393. }
  394. return match
  395. }
  396. func hasSrcType(rules []iptablestest.Rule, srcType string) bool {
  397. for _, r := range rules {
  398. if r[iptablestest.SrcType] != srcType {
  399. continue
  400. }
  401. return true
  402. }
  403. return false
  404. }
  405. func TestHasJump(t *testing.T) {
  406. testCases := map[string]struct {
  407. rules []iptablestest.Rule
  408. destChain string
  409. destIP string
  410. destPort int
  411. expected bool
  412. }{
  413. "case 1": {
  414. // Match the 1st rule(both dest IP and dest Port)
  415. rules: []iptablestest.Rule{
  416. {"-d ": "10.20.30.41/32", "--dport ": "80", "-p ": "tcp", "-j ": "REJECT"},
  417. {"--dport ": "3001", "-p ": "tcp", "-j ": "KUBE-MARK-MASQ"},
  418. },
  419. destChain: "REJECT",
  420. destIP: "10.20.30.41",
  421. destPort: 80,
  422. expected: true,
  423. },
  424. "case 2": {
  425. // Match the 2nd rule(dest Port)
  426. rules: []iptablestest.Rule{
  427. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  428. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  429. },
  430. destChain: "REJECT",
  431. destIP: "",
  432. destPort: 3001,
  433. expected: true,
  434. },
  435. "case 3": {
  436. // Match both dest IP and dest Port
  437. rules: []iptablestest.Rule{
  438. {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
  439. },
  440. destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
  441. destIP: "1.2.3.4",
  442. destPort: 80,
  443. expected: true,
  444. },
  445. "case 4": {
  446. // Match dest IP but doesn't match dest Port
  447. rules: []iptablestest.Rule{
  448. {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
  449. },
  450. destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
  451. destIP: "1.2.3.4",
  452. destPort: 8080,
  453. expected: false,
  454. },
  455. "case 5": {
  456. // Match dest Port but doesn't match dest IP
  457. rules: []iptablestest.Rule{
  458. {"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
  459. },
  460. destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
  461. destIP: "10.20.30.40",
  462. destPort: 80,
  463. expected: false,
  464. },
  465. "case 6": {
  466. // Match the 2nd rule(dest IP)
  467. rules: []iptablestest.Rule{
  468. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  469. {"-d ": "1.2.3.4/32", "-p ": "tcp", "-j ": "REJECT"},
  470. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  471. },
  472. destChain: "REJECT",
  473. destIP: "1.2.3.4",
  474. destPort: 8080,
  475. expected: true,
  476. },
  477. "case 7": {
  478. // Match the 2nd rule(dest Port)
  479. rules: []iptablestest.Rule{
  480. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  481. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  482. },
  483. destChain: "REJECT",
  484. destIP: "1.2.3.4",
  485. destPort: 3001,
  486. expected: true,
  487. },
  488. "case 8": {
  489. // Match the 1st rule(dest IP)
  490. rules: []iptablestest.Rule{
  491. {"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
  492. {"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
  493. },
  494. destChain: "REJECT",
  495. destIP: "10.20.30.41",
  496. destPort: 8080,
  497. expected: true,
  498. },
  499. "case 9": {
  500. rules: []iptablestest.Rule{
  501. {"-j ": "KUBE-SEP-LWSOSDSHMKPJHHJV"},
  502. },
  503. destChain: "KUBE-SEP-LWSOSDSHMKPJHHJV",
  504. destIP: "",
  505. destPort: 0,
  506. expected: true,
  507. },
  508. "case 10": {
  509. rules: []iptablestest.Rule{
  510. {"-j ": "KUBE-SEP-FOO"},
  511. },
  512. destChain: "KUBE-SEP-BAR",
  513. destIP: "",
  514. destPort: 0,
  515. expected: false,
  516. },
  517. }
  518. for k, tc := range testCases {
  519. if got := hasJump(tc.rules, tc.destChain, tc.destIP, tc.destPort); got != tc.expected {
  520. t.Errorf("%v: expected %v, got %v", k, tc.expected, got)
  521. }
  522. }
  523. }
  524. func hasDNAT(rules []iptablestest.Rule, endpoint string) bool {
  525. for _, r := range rules {
  526. if r[iptablestest.ToDest] == endpoint {
  527. return true
  528. }
  529. }
  530. return false
  531. }
  532. func errorf(msg string, rules []iptablestest.Rule, t *testing.T) {
  533. for _, r := range rules {
  534. t.Logf("%q", r)
  535. }
  536. t.Errorf("%v", msg)
  537. }
  538. func TestClusterIPReject(t *testing.T) {
  539. ipt := iptablestest.NewFake()
  540. fp := NewFakeProxier(ipt)
  541. svcIP := "10.20.30.41"
  542. svcPort := 80
  543. svcPortName := proxy.ServicePortName{
  544. NamespacedName: makeNSN("ns1", "svc1"),
  545. Port: "p80",
  546. }
  547. makeServiceMap(fp,
  548. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  549. svc.Spec.ClusterIP = svcIP
  550. svc.Spec.Ports = []v1.ServicePort{{
  551. Name: svcPortName.Port,
  552. Port: int32(svcPort),
  553. Protocol: v1.ProtocolTCP,
  554. }}
  555. }),
  556. )
  557. makeEndpointsMap(fp)
  558. fp.syncProxyRules()
  559. svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP))))
  560. svcRules := ipt.GetRules(svcChain)
  561. if len(svcRules) != 0 {
  562. errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcPortName), svcRules, t)
  563. }
  564. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  565. if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcPort) {
  566. errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
  567. }
  568. }
  569. func TestClusterIPEndpointsJump(t *testing.T) {
  570. ipt := iptablestest.NewFake()
  571. fp := NewFakeProxier(ipt)
  572. svcIP := "10.20.30.41"
  573. svcPort := 80
  574. svcPortName := proxy.ServicePortName{
  575. NamespacedName: makeNSN("ns1", "svc1"),
  576. Port: "p80",
  577. }
  578. makeServiceMap(fp,
  579. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  580. svc.Spec.ClusterIP = svcIP
  581. svc.Spec.Ports = []v1.ServicePort{{
  582. Name: svcPortName.Port,
  583. Port: int32(svcPort),
  584. Protocol: v1.ProtocolTCP,
  585. }}
  586. }),
  587. )
  588. epIP := "10.180.0.1"
  589. makeEndpointsMap(fp,
  590. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  591. ept.Subsets = []v1.EndpointSubset{{
  592. Addresses: []v1.EndpointAddress{{
  593. IP: epIP,
  594. }},
  595. Ports: []v1.EndpointPort{{
  596. Name: svcPortName.Port,
  597. Port: int32(svcPort),
  598. }},
  599. }}
  600. }),
  601. )
  602. fp.syncProxyRules()
  603. epStr := fmt.Sprintf("%s:%d", epIP, svcPort)
  604. svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP))))
  605. epChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStr))
  606. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  607. if !hasJump(kubeSvcRules, svcChain, svcIP, svcPort) {
  608. errorf(fmt.Sprintf("Failed to find jump from KUBE-SERVICES to %v chain", svcChain), kubeSvcRules, t)
  609. }
  610. svcRules := ipt.GetRules(svcChain)
  611. if !hasJump(svcRules, epChain, "", 0) {
  612. errorf(fmt.Sprintf("Failed to jump to ep chain %v", epChain), svcRules, t)
  613. }
  614. epRules := ipt.GetRules(epChain)
  615. if !hasDNAT(epRules, epStr) {
  616. errorf(fmt.Sprintf("Endpoint chain %v lacks DNAT to %v", epChain, epStr), epRules, t)
  617. }
  618. }
  619. func TestLoadBalancer(t *testing.T) {
  620. ipt := iptablestest.NewFake()
  621. fp := NewFakeProxier(ipt)
  622. svcIP := "10.20.30.41"
  623. svcPort := 80
  624. svcNodePort := 3001
  625. svcLBIP := "1.2.3.4"
  626. svcPortName := proxy.ServicePortName{
  627. NamespacedName: makeNSN("ns1", "svc1"),
  628. Port: "p80",
  629. }
  630. makeServiceMap(fp,
  631. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  632. svc.Spec.Type = "LoadBalancer"
  633. svc.Spec.ClusterIP = svcIP
  634. svc.Spec.Ports = []v1.ServicePort{{
  635. Name: svcPortName.Port,
  636. Port: int32(svcPort),
  637. Protocol: v1.ProtocolTCP,
  638. NodePort: int32(svcNodePort),
  639. }}
  640. svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
  641. IP: svcLBIP,
  642. }}
  643. }),
  644. )
  645. epIP := "10.180.0.1"
  646. makeEndpointsMap(fp,
  647. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  648. ept.Subsets = []v1.EndpointSubset{{
  649. Addresses: []v1.EndpointAddress{{
  650. IP: epIP,
  651. }},
  652. Ports: []v1.EndpointPort{{
  653. Name: svcPortName.Port,
  654. Port: int32(svcPort),
  655. }},
  656. }}
  657. }),
  658. )
  659. fp.syncProxyRules()
  660. proto := strings.ToLower(string(v1.ProtocolTCP))
  661. fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
  662. svcChain := string(servicePortChainName(svcPortName.String(), proto))
  663. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  664. if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
  665. errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
  666. }
  667. fwRules := ipt.GetRules(fwChain)
  668. if !hasJump(fwRules, svcChain, "", 0) || !hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
  669. errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, svcChain), fwRules, t)
  670. }
  671. }
  672. func TestNodePort(t *testing.T) {
  673. ipt := iptablestest.NewFake()
  674. fp := NewFakeProxier(ipt)
  675. svcIP := "10.20.30.41"
  676. svcPort := 80
  677. svcNodePort := 3001
  678. svcPortName := proxy.ServicePortName{
  679. NamespacedName: makeNSN("ns1", "svc1"),
  680. Port: "p80",
  681. }
  682. makeServiceMap(fp,
  683. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  684. svc.Spec.Type = "NodePort"
  685. svc.Spec.ClusterIP = svcIP
  686. svc.Spec.Ports = []v1.ServicePort{{
  687. Name: svcPortName.Port,
  688. Port: int32(svcPort),
  689. Protocol: v1.ProtocolTCP,
  690. NodePort: int32(svcNodePort),
  691. }}
  692. }),
  693. )
  694. epIP := "10.180.0.1"
  695. makeEndpointsMap(fp,
  696. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  697. ept.Subsets = []v1.EndpointSubset{{
  698. Addresses: []v1.EndpointAddress{{
  699. IP: epIP,
  700. }},
  701. Ports: []v1.EndpointPort{{
  702. Name: svcPortName.Port,
  703. Port: int32(svcPort),
  704. }},
  705. }}
  706. }),
  707. )
  708. itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
  709. addrs := []net.Addr{utilproxytest.AddrStruct{Val: "127.0.0.1/16"}}
  710. itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
  711. addrs1 := []net.Addr{utilproxytest.AddrStruct{Val: "::1/128"}}
  712. fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
  713. fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
  714. fp.nodePortAddresses = []string{}
  715. fp.syncProxyRules()
  716. proto := strings.ToLower(string(v1.ProtocolTCP))
  717. svcChain := string(servicePortChainName(svcPortName.String(), proto))
  718. kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
  719. if !hasJump(kubeNodePortRules, svcChain, "", svcNodePort) {
  720. errorf(fmt.Sprintf("Failed to find jump to svc chain %v", svcChain), kubeNodePortRules, t)
  721. }
  722. }
  723. func TestExternalIPsReject(t *testing.T) {
  724. ipt := iptablestest.NewFake()
  725. fp := NewFakeProxier(ipt)
  726. svcIP := "10.20.30.41"
  727. svcPort := 80
  728. svcExternalIPs := "50.60.70.81"
  729. svcPortName := proxy.ServicePortName{
  730. NamespacedName: makeNSN("ns1", "svc1"),
  731. Port: "p80",
  732. }
  733. makeServiceMap(fp,
  734. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  735. svc.Spec.Type = "ClusterIP"
  736. svc.Spec.ClusterIP = svcIP
  737. svc.Spec.ExternalIPs = []string{svcExternalIPs}
  738. svc.Spec.Ports = []v1.ServicePort{{
  739. Name: svcPortName.Port,
  740. Port: int32(svcPort),
  741. Protocol: v1.ProtocolTCP,
  742. TargetPort: intstr.FromInt(svcPort),
  743. }}
  744. }),
  745. )
  746. makeEndpointsMap(fp)
  747. fp.syncProxyRules()
  748. kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain))
  749. if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) {
  750. errorf(fmt.Sprintf("Failed to find a %v rule for externalIP %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
  751. }
  752. }
  753. func TestNodePortReject(t *testing.T) {
  754. ipt := iptablestest.NewFake()
  755. fp := NewFakeProxier(ipt)
  756. svcIP := "10.20.30.41"
  757. svcPort := 80
  758. svcNodePort := 3001
  759. svcPortName := proxy.ServicePortName{
  760. NamespacedName: makeNSN("ns1", "svc1"),
  761. Port: "p80",
  762. }
  763. makeServiceMap(fp,
  764. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  765. svc.Spec.Type = "NodePort"
  766. svc.Spec.ClusterIP = svcIP
  767. svc.Spec.Ports = []v1.ServicePort{{
  768. Name: svcPortName.Port,
  769. Port: int32(svcPort),
  770. Protocol: v1.ProtocolTCP,
  771. NodePort: int32(svcNodePort),
  772. }}
  773. }),
  774. )
  775. makeEndpointsMap(fp)
  776. fp.syncProxyRules()
  777. kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain))
  778. if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) {
  779. errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
  780. }
  781. }
  782. func TestOnlyLocalLoadBalancing(t *testing.T) {
  783. ipt := iptablestest.NewFake()
  784. fp := NewFakeProxier(ipt)
  785. svcIP := "10.20.30.41"
  786. svcPort := 80
  787. svcNodePort := 3001
  788. svcLBIP := "1.2.3.4"
  789. svcPortName := proxy.ServicePortName{
  790. NamespacedName: makeNSN("ns1", "svc1"),
  791. Port: "p80",
  792. }
  793. svcSessionAffinityTimeout := int32(10800)
  794. makeServiceMap(fp,
  795. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  796. svc.Spec.Type = "LoadBalancer"
  797. svc.Spec.ClusterIP = svcIP
  798. svc.Spec.Ports = []v1.ServicePort{{
  799. Name: svcPortName.Port,
  800. Port: int32(svcPort),
  801. Protocol: v1.ProtocolTCP,
  802. NodePort: int32(svcNodePort),
  803. }}
  804. svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
  805. IP: svcLBIP,
  806. }}
  807. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  808. svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
  809. svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
  810. ClientIP: &v1.ClientIPConfig{TimeoutSeconds: &svcSessionAffinityTimeout},
  811. }
  812. }),
  813. )
  814. epIP1 := "10.180.0.1"
  815. epIP2 := "10.180.2.1"
  816. epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
  817. epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
  818. makeEndpointsMap(fp,
  819. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  820. ept.Subsets = []v1.EndpointSubset{{
  821. Addresses: []v1.EndpointAddress{{
  822. IP: epIP1,
  823. NodeName: nil,
  824. }, {
  825. IP: epIP2,
  826. NodeName: utilpointer.StringPtr(testHostname),
  827. }},
  828. Ports: []v1.EndpointPort{{
  829. Name: svcPortName.Port,
  830. Port: int32(svcPort),
  831. }},
  832. }}
  833. }),
  834. )
  835. fp.syncProxyRules()
  836. proto := strings.ToLower(string(v1.ProtocolTCP))
  837. fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
  838. lbChain := string(serviceLBChainName(svcPortName.String(), proto))
  839. nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrLocal))
  840. localEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(v1.ProtocolTCP)), epStrNonLocal))
  841. kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
  842. if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
  843. errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
  844. }
  845. fwRules := ipt.GetRules(fwChain)
  846. if !hasJump(fwRules, lbChain, "", 0) {
  847. errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, lbChain), fwRules, t)
  848. }
  849. if hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
  850. errorf(fmt.Sprintf("Found jump from fw chain %v to MASQUERADE", fwChain), fwRules, t)
  851. }
  852. lbRules := ipt.GetRules(lbChain)
  853. if hasJump(lbRules, nonLocalEpChain, "", 0) {
  854. errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
  855. }
  856. if !hasJump(lbRules, localEpChain, "", 0) {
  857. errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrNonLocal), lbRules, t)
  858. }
  859. if !hasSessionAffinityRule(lbRules) {
  860. errorf(fmt.Sprintf("Didn't find session affinity rule from lb chain %v", lbChain), lbRules, t)
  861. }
  862. }
  863. func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) {
  864. ipt := iptablestest.NewFake()
  865. fp := NewFakeProxier(ipt)
  866. // set cluster CIDR to empty before test
  867. fp.clusterCIDR = ""
  868. onlyLocalNodePorts(t, fp, ipt)
  869. }
  870. func TestOnlyLocalNodePorts(t *testing.T) {
  871. ipt := iptablestest.NewFake()
  872. fp := NewFakeProxier(ipt)
  873. onlyLocalNodePorts(t, fp, ipt)
  874. }
  875. func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTables) {
  876. svcIP := "10.20.30.41"
  877. svcPort := 80
  878. svcNodePort := 3001
  879. svcPortName := proxy.ServicePortName{
  880. NamespacedName: makeNSN("ns1", "svc1"),
  881. Port: "p80",
  882. }
  883. makeServiceMap(fp,
  884. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  885. svc.Spec.Type = "NodePort"
  886. svc.Spec.ClusterIP = svcIP
  887. svc.Spec.Ports = []v1.ServicePort{{
  888. Name: svcPortName.Port,
  889. Port: int32(svcPort),
  890. Protocol: v1.ProtocolTCP,
  891. NodePort: int32(svcNodePort),
  892. }}
  893. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  894. }),
  895. )
  896. epIP1 := "10.180.0.1"
  897. epIP2 := "10.180.2.1"
  898. epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
  899. epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
  900. makeEndpointsMap(fp,
  901. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  902. ept.Subsets = []v1.EndpointSubset{{
  903. Addresses: []v1.EndpointAddress{{
  904. IP: epIP1,
  905. NodeName: nil,
  906. }, {
  907. IP: epIP2,
  908. NodeName: utilpointer.StringPtr(testHostname),
  909. }},
  910. Ports: []v1.EndpointPort{{
  911. Name: svcPortName.Port,
  912. Port: int32(svcPort),
  913. }},
  914. }}
  915. }),
  916. )
  917. itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0}
  918. addrs := []net.Addr{utilproxytest.AddrStruct{Val: "10.20.30.51/24"}}
  919. fp.networkInterfacer.(*utilproxytest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
  920. fp.nodePortAddresses = []string{"10.20.30.0/24"}
  921. fp.syncProxyRules()
  922. proto := strings.ToLower(string(v1.ProtocolTCP))
  923. lbChain := string(serviceLBChainName(svcPortName.String(), proto))
  924. nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), proto, epStrLocal))
  925. localEpChain := string(servicePortEndpointChainName(svcPortName.String(), proto, epStrNonLocal))
  926. kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
  927. if !hasJump(kubeNodePortRules, lbChain, "", svcNodePort) {
  928. errorf(fmt.Sprintf("Failed to find jump to lb chain %v", lbChain), kubeNodePortRules, t)
  929. }
  930. if !hasJump(kubeNodePortRules, string(KubeMarkMasqChain), "", svcNodePort) {
  931. errorf(fmt.Sprintf("Failed to find jump to %s chain for destination IP %d", KubeMarkMasqChain, svcNodePort), kubeNodePortRules, t)
  932. }
  933. kubeServiceRules := ipt.GetRules(string(kubeServicesChain))
  934. if !hasJump(kubeServiceRules, string(kubeNodePortsChain), "10.20.30.51", 0) {
  935. errorf(fmt.Sprintf("Failed to find jump to KUBE-NODEPORTS chain %v", string(kubeNodePortsChain)), kubeServiceRules, t)
  936. }
  937. svcChain := string(servicePortChainName(svcPortName.String(), proto))
  938. lbRules := ipt.GetRules(lbChain)
  939. if hasJump(lbRules, nonLocalEpChain, "", 0) {
  940. errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
  941. }
  942. if !hasJump(lbRules, svcChain, "", 0) || !hasSrcType(lbRules, "LOCAL") {
  943. errorf(fmt.Sprintf("Did not find jump from lb chain %v to svc %v with src-type LOCAL", lbChain, svcChain), lbRules, t)
  944. }
  945. if !hasJump(lbRules, localEpChain, "", 0) {
  946. errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrLocal), lbRules, t)
  947. }
  948. }
  949. func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
  950. svc := &v1.Service{
  951. ObjectMeta: metav1.ObjectMeta{
  952. Name: name,
  953. Namespace: namespace,
  954. Annotations: map[string]string{},
  955. },
  956. Spec: v1.ServiceSpec{},
  957. Status: v1.ServiceStatus{},
  958. }
  959. svcFunc(svc)
  960. return svc
  961. }
  962. func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port, nodeport int32, targetPort int) []v1.ServicePort {
  963. svcPort := v1.ServicePort{
  964. Name: name,
  965. Protocol: protocol,
  966. Port: port,
  967. NodePort: nodeport,
  968. TargetPort: intstr.FromInt(targetPort),
  969. }
  970. return append(array, svcPort)
  971. }
  972. func TestBuildServiceMapAddRemove(t *testing.T) {
  973. ipt := iptablestest.NewFake()
  974. fp := NewFakeProxier(ipt)
  975. services := []*v1.Service{
  976. makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
  977. svc.Spec.Type = v1.ServiceTypeClusterIP
  978. svc.Spec.ClusterIP = "172.16.55.4"
  979. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
  980. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
  981. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpport", "SCTP", 1236, 6321, 0)
  982. }),
  983. makeTestService("somewhere-else", "node-port", func(svc *v1.Service) {
  984. svc.Spec.Type = v1.ServiceTypeNodePort
  985. svc.Spec.ClusterIP = "172.16.55.10"
  986. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0)
  987. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0)
  988. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "muchmoreblah", "SCTP", 343, 676, 0)
  989. }),
  990. makeTestService("somewhere", "load-balancer", func(svc *v1.Service) {
  991. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  992. svc.Spec.ClusterIP = "172.16.55.11"
  993. svc.Spec.LoadBalancerIP = "5.6.7.8"
  994. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
  995. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
  996. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  997. Ingress: []v1.LoadBalancerIngress{
  998. {IP: "10.1.2.4"},
  999. },
  1000. }
  1001. }),
  1002. makeTestService("somewhere", "only-local-load-balancer", func(svc *v1.Service) {
  1003. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1004. svc.Spec.ClusterIP = "172.16.55.12"
  1005. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1006. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
  1007. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
  1008. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1009. Ingress: []v1.LoadBalancerIngress{
  1010. {IP: "10.1.2.3"},
  1011. },
  1012. }
  1013. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  1014. svc.Spec.HealthCheckNodePort = 345
  1015. }),
  1016. }
  1017. for i := range services {
  1018. fp.OnServiceAdd(services[i])
  1019. }
  1020. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1021. if len(fp.serviceMap) != 10 {
  1022. t.Errorf("expected service map length 10, got %v", fp.serviceMap)
  1023. }
  1024. // The only-local-loadbalancer ones get added
  1025. if len(result.HCServiceNodePorts) != 1 {
  1026. t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
  1027. } else {
  1028. nsn := makeNSN("somewhere", "only-local-load-balancer")
  1029. if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
  1030. t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
  1031. }
  1032. }
  1033. if len(result.UDPStaleClusterIP) != 0 {
  1034. // Services only added, so nothing stale yet
  1035. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1036. }
  1037. // Remove some stuff
  1038. // oneService is a modification of services[0] with removed first port.
  1039. oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
  1040. svc.Spec.Type = v1.ServiceTypeClusterIP
  1041. svc.Spec.ClusterIP = "172.16.55.4"
  1042. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
  1043. })
  1044. fp.OnServiceUpdate(services[0], oneService)
  1045. fp.OnServiceDelete(services[1])
  1046. fp.OnServiceDelete(services[2])
  1047. fp.OnServiceDelete(services[3])
  1048. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1049. if len(fp.serviceMap) != 1 {
  1050. t.Errorf("expected service map length 1, got %v", fp.serviceMap)
  1051. }
  1052. if len(result.HCServiceNodePorts) != 0 {
  1053. t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
  1054. }
  1055. // All services but one were deleted. While you'd expect only the ClusterIPs
  1056. // from the three deleted services here, we still have the ClusterIP for
  1057. // the not-deleted service, because one of it's ServicePorts was deleted.
  1058. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
  1059. if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
  1060. t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList())
  1061. }
  1062. for _, ip := range expectedStaleUDPServices {
  1063. if !result.UDPStaleClusterIP.Has(ip) {
  1064. t.Errorf("expected stale UDP service service %s", ip)
  1065. }
  1066. }
  1067. }
  1068. func TestBuildServiceMapServiceHeadless(t *testing.T) {
  1069. ipt := iptablestest.NewFake()
  1070. fp := NewFakeProxier(ipt)
  1071. makeServiceMap(fp,
  1072. makeTestService("somewhere-else", "headless", func(svc *v1.Service) {
  1073. svc.Spec.Type = v1.ServiceTypeClusterIP
  1074. svc.Spec.ClusterIP = v1.ClusterIPNone
  1075. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
  1076. }),
  1077. makeTestService("somewhere-else", "headless-without-port", func(svc *v1.Service) {
  1078. svc.Spec.Type = v1.ServiceTypeClusterIP
  1079. svc.Spec.ClusterIP = v1.ClusterIPNone
  1080. }),
  1081. )
  1082. // Headless service should be ignored
  1083. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1084. if len(fp.serviceMap) != 0 {
  1085. t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
  1086. }
  1087. // No proxied services, so no healthchecks
  1088. if len(result.HCServiceNodePorts) != 0 {
  1089. t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
  1090. }
  1091. if len(result.UDPStaleClusterIP) != 0 {
  1092. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1093. }
  1094. }
  1095. func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
  1096. ipt := iptablestest.NewFake()
  1097. fp := NewFakeProxier(ipt)
  1098. makeServiceMap(fp,
  1099. makeTestService("somewhere-else", "external-name", func(svc *v1.Service) {
  1100. svc.Spec.Type = v1.ServiceTypeExternalName
  1101. svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
  1102. svc.Spec.ExternalName = "foo2.bar.com"
  1103. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
  1104. }),
  1105. )
  1106. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1107. if len(fp.serviceMap) != 0 {
  1108. t.Errorf("expected service map length 0, got %v", fp.serviceMap)
  1109. }
  1110. // No proxied services, so no healthchecks
  1111. if len(result.HCServiceNodePorts) != 0 {
  1112. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1113. }
  1114. if len(result.UDPStaleClusterIP) != 0 {
  1115. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
  1116. }
  1117. }
  1118. func TestBuildServiceMapServiceUpdate(t *testing.T) {
  1119. ipt := iptablestest.NewFake()
  1120. fp := NewFakeProxier(ipt)
  1121. servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
  1122. svc.Spec.Type = v1.ServiceTypeClusterIP
  1123. svc.Spec.ClusterIP = "172.16.55.4"
  1124. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
  1125. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
  1126. })
  1127. servicev2 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
  1128. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1129. svc.Spec.ClusterIP = "172.16.55.4"
  1130. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1131. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
  1132. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
  1133. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1134. Ingress: []v1.LoadBalancerIngress{
  1135. {IP: "10.1.2.3"},
  1136. },
  1137. }
  1138. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  1139. svc.Spec.HealthCheckNodePort = 345
  1140. })
  1141. fp.OnServiceAdd(servicev1)
  1142. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1143. if len(fp.serviceMap) != 2 {
  1144. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1145. }
  1146. if len(result.HCServiceNodePorts) != 0 {
  1147. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1148. }
  1149. if len(result.UDPStaleClusterIP) != 0 {
  1150. // Services only added, so nothing stale yet
  1151. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1152. }
  1153. // Change service to load-balancer
  1154. fp.OnServiceUpdate(servicev1, servicev2)
  1155. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1156. if len(fp.serviceMap) != 2 {
  1157. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1158. }
  1159. if len(result.HCServiceNodePorts) != 1 {
  1160. t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
  1161. }
  1162. if len(result.UDPStaleClusterIP) != 0 {
  1163. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
  1164. }
  1165. // No change; make sure the service map stays the same and there are
  1166. // no health-check changes
  1167. fp.OnServiceUpdate(servicev2, servicev2)
  1168. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1169. if len(fp.serviceMap) != 2 {
  1170. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1171. }
  1172. if len(result.HCServiceNodePorts) != 1 {
  1173. t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
  1174. }
  1175. if len(result.UDPStaleClusterIP) != 0 {
  1176. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
  1177. }
  1178. // And back to ClusterIP
  1179. fp.OnServiceUpdate(servicev2, servicev1)
  1180. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1181. if len(fp.serviceMap) != 2 {
  1182. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1183. }
  1184. if len(result.HCServiceNodePorts) != 0 {
  1185. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1186. }
  1187. if len(result.UDPStaleClusterIP) != 0 {
  1188. // Services only added, so nothing stale yet
  1189. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1190. }
  1191. }
  1192. func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
  1193. ept := &v1.Endpoints{
  1194. ObjectMeta: metav1.ObjectMeta{
  1195. Name: name,
  1196. Namespace: namespace,
  1197. },
  1198. }
  1199. eptFunc(ept)
  1200. return ept
  1201. }
  1202. func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
  1203. for i := range allEndpoints {
  1204. proxier.OnEndpointsAdd(allEndpoints[i])
  1205. }
  1206. proxier.mu.Lock()
  1207. defer proxier.mu.Unlock()
  1208. proxier.endpointsSynced = true
  1209. }
  1210. func makeNSN(namespace, name string) types.NamespacedName {
  1211. return types.NamespacedName{Namespace: namespace, Name: name}
  1212. }
  1213. func makeServicePortName(ns, name, port string) proxy.ServicePortName {
  1214. return proxy.ServicePortName{
  1215. NamespacedName: makeNSN(ns, name),
  1216. Port: port,
  1217. }
  1218. }
  1219. func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
  1220. for i := range allServices {
  1221. proxier.OnServiceAdd(allServices[i])
  1222. }
  1223. proxier.mu.Lock()
  1224. defer proxier.mu.Unlock()
  1225. proxier.servicesSynced = true
  1226. }
  1227. func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
  1228. if len(newMap) != len(expected) {
  1229. t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
  1230. }
  1231. for x := range expected {
  1232. if len(newMap[x]) != len(expected[x]) {
  1233. t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
  1234. } else {
  1235. for i := range expected[x] {
  1236. newEp, ok := newMap[x][i].(*endpointsInfo)
  1237. if !ok {
  1238. t.Errorf("Failed to cast endpointsInfo")
  1239. continue
  1240. }
  1241. if newEp.Endpoint != expected[x][i].Endpoint ||
  1242. newEp.IsLocal != expected[x][i].IsLocal ||
  1243. newEp.protocol != expected[x][i].protocol ||
  1244. newEp.chainName != expected[x][i].chainName {
  1245. t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
  1246. }
  1247. }
  1248. }
  1249. }
  1250. }
  1251. func Test_updateEndpointsMap(t *testing.T) {
  1252. var nodeName = testHostname
  1253. emptyEndpoint := func(ept *v1.Endpoints) {
  1254. ept.Subsets = []v1.EndpointSubset{}
  1255. }
  1256. unnamedPort := func(ept *v1.Endpoints) {
  1257. ept.Subsets = []v1.EndpointSubset{{
  1258. Addresses: []v1.EndpointAddress{{
  1259. IP: "1.1.1.1",
  1260. }},
  1261. Ports: []v1.EndpointPort{{
  1262. Port: 11,
  1263. }},
  1264. }}
  1265. }
  1266. unnamedPortLocal := func(ept *v1.Endpoints) {
  1267. ept.Subsets = []v1.EndpointSubset{{
  1268. Addresses: []v1.EndpointAddress{{
  1269. IP: "1.1.1.1",
  1270. NodeName: &nodeName,
  1271. }},
  1272. Ports: []v1.EndpointPort{{
  1273. Port: 11,
  1274. }},
  1275. }}
  1276. }
  1277. namedPortLocal := func(ept *v1.Endpoints) {
  1278. ept.Subsets = []v1.EndpointSubset{{
  1279. Addresses: []v1.EndpointAddress{{
  1280. IP: "1.1.1.1",
  1281. NodeName: &nodeName,
  1282. }},
  1283. Ports: []v1.EndpointPort{{
  1284. Name: "p11",
  1285. Port: 11,
  1286. }},
  1287. }}
  1288. }
  1289. namedPort := func(ept *v1.Endpoints) {
  1290. ept.Subsets = []v1.EndpointSubset{{
  1291. Addresses: []v1.EndpointAddress{{
  1292. IP: "1.1.1.1",
  1293. }},
  1294. Ports: []v1.EndpointPort{{
  1295. Name: "p11",
  1296. Port: 11,
  1297. }},
  1298. }}
  1299. }
  1300. namedPortRenamed := func(ept *v1.Endpoints) {
  1301. ept.Subsets = []v1.EndpointSubset{{
  1302. Addresses: []v1.EndpointAddress{{
  1303. IP: "1.1.1.1",
  1304. }},
  1305. Ports: []v1.EndpointPort{{
  1306. Name: "p11-2",
  1307. Port: 11,
  1308. }},
  1309. }}
  1310. }
  1311. namedPortRenumbered := func(ept *v1.Endpoints) {
  1312. ept.Subsets = []v1.EndpointSubset{{
  1313. Addresses: []v1.EndpointAddress{{
  1314. IP: "1.1.1.1",
  1315. }},
  1316. Ports: []v1.EndpointPort{{
  1317. Name: "p11",
  1318. Port: 22,
  1319. }},
  1320. }}
  1321. }
  1322. namedPortsLocalNoLocal := func(ept *v1.Endpoints) {
  1323. ept.Subsets = []v1.EndpointSubset{{
  1324. Addresses: []v1.EndpointAddress{{
  1325. IP: "1.1.1.1",
  1326. }, {
  1327. IP: "1.1.1.2",
  1328. NodeName: &nodeName,
  1329. }},
  1330. Ports: []v1.EndpointPort{{
  1331. Name: "p11",
  1332. Port: 11,
  1333. }, {
  1334. Name: "p12",
  1335. Port: 12,
  1336. }},
  1337. }}
  1338. }
  1339. multipleSubsets := func(ept *v1.Endpoints) {
  1340. ept.Subsets = []v1.EndpointSubset{{
  1341. Addresses: []v1.EndpointAddress{{
  1342. IP: "1.1.1.1",
  1343. }},
  1344. Ports: []v1.EndpointPort{{
  1345. Name: "p11",
  1346. Port: 11,
  1347. }},
  1348. }, {
  1349. Addresses: []v1.EndpointAddress{{
  1350. IP: "1.1.1.2",
  1351. }},
  1352. Ports: []v1.EndpointPort{{
  1353. Name: "p12",
  1354. Port: 12,
  1355. }},
  1356. }}
  1357. }
  1358. multipleSubsetsWithLocal := func(ept *v1.Endpoints) {
  1359. ept.Subsets = []v1.EndpointSubset{{
  1360. Addresses: []v1.EndpointAddress{{
  1361. IP: "1.1.1.1",
  1362. }},
  1363. Ports: []v1.EndpointPort{{
  1364. Name: "p11",
  1365. Port: 11,
  1366. }},
  1367. }, {
  1368. Addresses: []v1.EndpointAddress{{
  1369. IP: "1.1.1.2",
  1370. NodeName: &nodeName,
  1371. }},
  1372. Ports: []v1.EndpointPort{{
  1373. Name: "p12",
  1374. Port: 12,
  1375. }},
  1376. }}
  1377. }
  1378. multipleSubsetsMultiplePortsLocal := func(ept *v1.Endpoints) {
  1379. ept.Subsets = []v1.EndpointSubset{{
  1380. Addresses: []v1.EndpointAddress{{
  1381. IP: "1.1.1.1",
  1382. NodeName: &nodeName,
  1383. }},
  1384. Ports: []v1.EndpointPort{{
  1385. Name: "p11",
  1386. Port: 11,
  1387. }, {
  1388. Name: "p12",
  1389. Port: 12,
  1390. }},
  1391. }, {
  1392. Addresses: []v1.EndpointAddress{{
  1393. IP: "1.1.1.3",
  1394. }},
  1395. Ports: []v1.EndpointPort{{
  1396. Name: "p13",
  1397. Port: 13,
  1398. }},
  1399. }}
  1400. }
  1401. multipleSubsetsIPsPorts1 := func(ept *v1.Endpoints) {
  1402. ept.Subsets = []v1.EndpointSubset{{
  1403. Addresses: []v1.EndpointAddress{{
  1404. IP: "1.1.1.1",
  1405. }, {
  1406. IP: "1.1.1.2",
  1407. NodeName: &nodeName,
  1408. }},
  1409. Ports: []v1.EndpointPort{{
  1410. Name: "p11",
  1411. Port: 11,
  1412. }, {
  1413. Name: "p12",
  1414. Port: 12,
  1415. }},
  1416. }, {
  1417. Addresses: []v1.EndpointAddress{{
  1418. IP: "1.1.1.3",
  1419. }, {
  1420. IP: "1.1.1.4",
  1421. NodeName: &nodeName,
  1422. }},
  1423. Ports: []v1.EndpointPort{{
  1424. Name: "p13",
  1425. Port: 13,
  1426. }, {
  1427. Name: "p14",
  1428. Port: 14,
  1429. }},
  1430. }}
  1431. }
  1432. multipleSubsetsIPsPorts2 := func(ept *v1.Endpoints) {
  1433. ept.Subsets = []v1.EndpointSubset{{
  1434. Addresses: []v1.EndpointAddress{{
  1435. IP: "2.2.2.1",
  1436. }, {
  1437. IP: "2.2.2.2",
  1438. NodeName: &nodeName,
  1439. }},
  1440. Ports: []v1.EndpointPort{{
  1441. Name: "p21",
  1442. Port: 21,
  1443. }, {
  1444. Name: "p22",
  1445. Port: 22,
  1446. }},
  1447. }}
  1448. }
  1449. complexBefore1 := func(ept *v1.Endpoints) {
  1450. ept.Subsets = []v1.EndpointSubset{{
  1451. Addresses: []v1.EndpointAddress{{
  1452. IP: "1.1.1.1",
  1453. }},
  1454. Ports: []v1.EndpointPort{{
  1455. Name: "p11",
  1456. Port: 11,
  1457. }},
  1458. }}
  1459. }
  1460. complexBefore2 := func(ept *v1.Endpoints) {
  1461. ept.Subsets = []v1.EndpointSubset{{
  1462. Addresses: []v1.EndpointAddress{{
  1463. IP: "2.2.2.2",
  1464. NodeName: &nodeName,
  1465. }, {
  1466. IP: "2.2.2.22",
  1467. NodeName: &nodeName,
  1468. }},
  1469. Ports: []v1.EndpointPort{{
  1470. Name: "p22",
  1471. Port: 22,
  1472. }},
  1473. }, {
  1474. Addresses: []v1.EndpointAddress{{
  1475. IP: "2.2.2.3",
  1476. NodeName: &nodeName,
  1477. }},
  1478. Ports: []v1.EndpointPort{{
  1479. Name: "p23",
  1480. Port: 23,
  1481. }},
  1482. }}
  1483. }
  1484. complexBefore4 := func(ept *v1.Endpoints) {
  1485. ept.Subsets = []v1.EndpointSubset{{
  1486. Addresses: []v1.EndpointAddress{{
  1487. IP: "4.4.4.4",
  1488. NodeName: &nodeName,
  1489. }, {
  1490. IP: "4.4.4.5",
  1491. NodeName: &nodeName,
  1492. }},
  1493. Ports: []v1.EndpointPort{{
  1494. Name: "p44",
  1495. Port: 44,
  1496. }},
  1497. }, {
  1498. Addresses: []v1.EndpointAddress{{
  1499. IP: "4.4.4.6",
  1500. NodeName: &nodeName,
  1501. }},
  1502. Ports: []v1.EndpointPort{{
  1503. Name: "p45",
  1504. Port: 45,
  1505. }},
  1506. }}
  1507. }
  1508. complexAfter1 := func(ept *v1.Endpoints) {
  1509. ept.Subsets = []v1.EndpointSubset{{
  1510. Addresses: []v1.EndpointAddress{{
  1511. IP: "1.1.1.1",
  1512. }, {
  1513. IP: "1.1.1.11",
  1514. }},
  1515. Ports: []v1.EndpointPort{{
  1516. Name: "p11",
  1517. Port: 11,
  1518. }},
  1519. }, {
  1520. Addresses: []v1.EndpointAddress{{
  1521. IP: "1.1.1.2",
  1522. }},
  1523. Ports: []v1.EndpointPort{{
  1524. Name: "p12",
  1525. Port: 12,
  1526. }, {
  1527. Name: "p122",
  1528. Port: 122,
  1529. }},
  1530. }}
  1531. }
  1532. complexAfter3 := func(ept *v1.Endpoints) {
  1533. ept.Subsets = []v1.EndpointSubset{{
  1534. Addresses: []v1.EndpointAddress{{
  1535. IP: "3.3.3.3",
  1536. }},
  1537. Ports: []v1.EndpointPort{{
  1538. Name: "p33",
  1539. Port: 33,
  1540. }},
  1541. }}
  1542. }
  1543. complexAfter4 := func(ept *v1.Endpoints) {
  1544. ept.Subsets = []v1.EndpointSubset{{
  1545. Addresses: []v1.EndpointAddress{{
  1546. IP: "4.4.4.4",
  1547. NodeName: &nodeName,
  1548. }},
  1549. Ports: []v1.EndpointPort{{
  1550. Name: "p44",
  1551. Port: 44,
  1552. }},
  1553. }}
  1554. }
  1555. testCases := []struct {
  1556. // previousEndpoints and currentEndpoints are used to call appropriate
  1557. // handlers OnEndpoints* (based on whether corresponding values are nil
  1558. // or non-nil) and must be of equal length.
  1559. previousEndpoints []*v1.Endpoints
  1560. currentEndpoints []*v1.Endpoints
  1561. oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
  1562. expectedResult map[proxy.ServicePortName][]*endpointsInfo
  1563. expectedStaleEndpoints []proxy.ServiceEndpoint
  1564. expectedStaleServiceNames map[proxy.ServicePortName]bool
  1565. expectedHealthchecks map[types.NamespacedName]int
  1566. }{{
  1567. // Case[0]: nothing
  1568. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
  1569. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
  1570. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1571. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1572. expectedHealthchecks: map[types.NamespacedName]int{},
  1573. }, {
  1574. // Case[1]: no change, unnamed port
  1575. previousEndpoints: []*v1.Endpoints{
  1576. makeTestEndpoints("ns1", "ep1", unnamedPort),
  1577. },
  1578. currentEndpoints: []*v1.Endpoints{
  1579. makeTestEndpoints("ns1", "ep1", unnamedPort),
  1580. },
  1581. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1582. makeServicePortName("ns1", "ep1", ""): {
  1583. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1584. },
  1585. },
  1586. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1587. makeServicePortName("ns1", "ep1", ""): {
  1588. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1589. },
  1590. },
  1591. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1592. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1593. expectedHealthchecks: map[types.NamespacedName]int{},
  1594. }, {
  1595. // Case[2]: no change, named port, local
  1596. previousEndpoints: []*v1.Endpoints{
  1597. makeTestEndpoints("ns1", "ep1", namedPortLocal),
  1598. },
  1599. currentEndpoints: []*v1.Endpoints{
  1600. makeTestEndpoints("ns1", "ep1", namedPortLocal),
  1601. },
  1602. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1603. makeServicePortName("ns1", "ep1", "p11"): {
  1604. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1605. },
  1606. },
  1607. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1608. makeServicePortName("ns1", "ep1", "p11"): {
  1609. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1610. },
  1611. },
  1612. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1613. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1614. expectedHealthchecks: map[types.NamespacedName]int{
  1615. makeNSN("ns1", "ep1"): 1,
  1616. },
  1617. }, {
  1618. // Case[3]: no change, multiple subsets
  1619. previousEndpoints: []*v1.Endpoints{
  1620. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  1621. },
  1622. currentEndpoints: []*v1.Endpoints{
  1623. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  1624. },
  1625. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1626. makeServicePortName("ns1", "ep1", "p11"): {
  1627. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1628. },
  1629. makeServicePortName("ns1", "ep1", "p12"): {
  1630. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  1631. },
  1632. },
  1633. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1634. makeServicePortName("ns1", "ep1", "p11"): {
  1635. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1636. },
  1637. makeServicePortName("ns1", "ep1", "p12"): {
  1638. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  1639. },
  1640. },
  1641. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1642. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1643. expectedHealthchecks: map[types.NamespacedName]int{},
  1644. }, {
  1645. // Case[4]: no change, multiple subsets, multiple ports, local
  1646. previousEndpoints: []*v1.Endpoints{
  1647. makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
  1648. },
  1649. currentEndpoints: []*v1.Endpoints{
  1650. makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
  1651. },
  1652. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1653. makeServicePortName("ns1", "ep1", "p11"): {
  1654. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1655. },
  1656. makeServicePortName("ns1", "ep1", "p12"): {
  1657. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: true}},
  1658. },
  1659. makeServicePortName("ns1", "ep1", "p13"): {
  1660. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1661. },
  1662. },
  1663. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1664. makeServicePortName("ns1", "ep1", "p11"): {
  1665. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1666. },
  1667. makeServicePortName("ns1", "ep1", "p12"): {
  1668. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: true}},
  1669. },
  1670. makeServicePortName("ns1", "ep1", "p13"): {
  1671. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1672. },
  1673. },
  1674. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1675. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1676. expectedHealthchecks: map[types.NamespacedName]int{
  1677. makeNSN("ns1", "ep1"): 1,
  1678. },
  1679. }, {
  1680. // Case[5]: no change, multiple endpoints, subsets, IPs, and ports
  1681. previousEndpoints: []*v1.Endpoints{
  1682. makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
  1683. makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
  1684. },
  1685. currentEndpoints: []*v1.Endpoints{
  1686. makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
  1687. makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
  1688. },
  1689. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1690. makeServicePortName("ns1", "ep1", "p11"): {
  1691. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1692. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1693. },
  1694. makeServicePortName("ns1", "ep1", "p12"): {
  1695. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1696. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1697. },
  1698. makeServicePortName("ns1", "ep1", "p13"): {
  1699. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1700. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:13", IsLocal: true}},
  1701. },
  1702. makeServicePortName("ns1", "ep1", "p14"): {
  1703. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:14", IsLocal: false}},
  1704. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:14", IsLocal: true}},
  1705. },
  1706. makeServicePortName("ns2", "ep2", "p21"): {
  1707. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:21", IsLocal: false}},
  1708. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:21", IsLocal: true}},
  1709. },
  1710. makeServicePortName("ns2", "ep2", "p22"): {
  1711. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:22", IsLocal: false}},
  1712. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
  1713. },
  1714. },
  1715. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1716. makeServicePortName("ns1", "ep1", "p11"): {
  1717. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1718. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1719. },
  1720. makeServicePortName("ns1", "ep1", "p12"): {
  1721. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1722. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1723. },
  1724. makeServicePortName("ns1", "ep1", "p13"): {
  1725. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:13", IsLocal: false}},
  1726. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:13", IsLocal: true}},
  1727. },
  1728. makeServicePortName("ns1", "ep1", "p14"): {
  1729. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.3:14", IsLocal: false}},
  1730. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.4:14", IsLocal: true}},
  1731. },
  1732. makeServicePortName("ns2", "ep2", "p21"): {
  1733. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:21", IsLocal: false}},
  1734. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:21", IsLocal: true}},
  1735. },
  1736. makeServicePortName("ns2", "ep2", "p22"): {
  1737. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.1:22", IsLocal: false}},
  1738. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
  1739. },
  1740. },
  1741. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1742. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1743. expectedHealthchecks: map[types.NamespacedName]int{
  1744. makeNSN("ns1", "ep1"): 2,
  1745. makeNSN("ns2", "ep2"): 1,
  1746. },
  1747. }, {
  1748. // Case[6]: add an Endpoints
  1749. previousEndpoints: []*v1.Endpoints{
  1750. nil,
  1751. },
  1752. currentEndpoints: []*v1.Endpoints{
  1753. makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
  1754. },
  1755. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
  1756. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1757. makeServicePortName("ns1", "ep1", ""): {
  1758. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1759. },
  1760. },
  1761. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1762. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  1763. makeServicePortName("ns1", "ep1", ""): true,
  1764. },
  1765. expectedHealthchecks: map[types.NamespacedName]int{
  1766. makeNSN("ns1", "ep1"): 1,
  1767. },
  1768. }, {
  1769. // Case[7]: remove an Endpoints
  1770. previousEndpoints: []*v1.Endpoints{
  1771. makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
  1772. },
  1773. currentEndpoints: []*v1.Endpoints{
  1774. nil,
  1775. },
  1776. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1777. makeServicePortName("ns1", "ep1", ""): {
  1778. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: true}},
  1779. },
  1780. },
  1781. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
  1782. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1783. Endpoint: "1.1.1.1:11",
  1784. ServicePortName: makeServicePortName("ns1", "ep1", ""),
  1785. }},
  1786. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1787. expectedHealthchecks: map[types.NamespacedName]int{},
  1788. }, {
  1789. // Case[8]: add an IP and port
  1790. previousEndpoints: []*v1.Endpoints{
  1791. makeTestEndpoints("ns1", "ep1", namedPort),
  1792. },
  1793. currentEndpoints: []*v1.Endpoints{
  1794. makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
  1795. },
  1796. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1797. makeServicePortName("ns1", "ep1", "p11"): {
  1798. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1799. },
  1800. },
  1801. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1802. makeServicePortName("ns1", "ep1", "p11"): {
  1803. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1804. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1805. },
  1806. makeServicePortName("ns1", "ep1", "p12"): {
  1807. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1808. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1809. },
  1810. },
  1811. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1812. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  1813. makeServicePortName("ns1", "ep1", "p12"): true,
  1814. },
  1815. expectedHealthchecks: map[types.NamespacedName]int{
  1816. makeNSN("ns1", "ep1"): 1,
  1817. },
  1818. }, {
  1819. // Case[9]: remove an IP and port
  1820. previousEndpoints: []*v1.Endpoints{
  1821. makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
  1822. },
  1823. currentEndpoints: []*v1.Endpoints{
  1824. makeTestEndpoints("ns1", "ep1", namedPort),
  1825. },
  1826. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1827. makeServicePortName("ns1", "ep1", "p11"): {
  1828. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1829. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:11", IsLocal: true}},
  1830. },
  1831. makeServicePortName("ns1", "ep1", "p12"): {
  1832. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:12", IsLocal: false}},
  1833. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1834. },
  1835. },
  1836. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1837. makeServicePortName("ns1", "ep1", "p11"): {
  1838. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1839. },
  1840. },
  1841. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1842. Endpoint: "1.1.1.2:11",
  1843. ServicePortName: makeServicePortName("ns1", "ep1", "p11"),
  1844. }, {
  1845. Endpoint: "1.1.1.1:12",
  1846. ServicePortName: makeServicePortName("ns1", "ep1", "p12"),
  1847. }, {
  1848. Endpoint: "1.1.1.2:12",
  1849. ServicePortName: makeServicePortName("ns1", "ep1", "p12"),
  1850. }},
  1851. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1852. expectedHealthchecks: map[types.NamespacedName]int{},
  1853. }, {
  1854. // Case[10]: add a subset
  1855. previousEndpoints: []*v1.Endpoints{
  1856. makeTestEndpoints("ns1", "ep1", namedPort),
  1857. },
  1858. currentEndpoints: []*v1.Endpoints{
  1859. makeTestEndpoints("ns1", "ep1", multipleSubsetsWithLocal),
  1860. },
  1861. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1862. makeServicePortName("ns1", "ep1", "p11"): {
  1863. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1864. },
  1865. },
  1866. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1867. makeServicePortName("ns1", "ep1", "p11"): {
  1868. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1869. },
  1870. makeServicePortName("ns1", "ep1", "p12"): {
  1871. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: true}},
  1872. },
  1873. },
  1874. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  1875. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  1876. makeServicePortName("ns1", "ep1", "p12"): true,
  1877. },
  1878. expectedHealthchecks: map[types.NamespacedName]int{
  1879. makeNSN("ns1", "ep1"): 1,
  1880. },
  1881. }, {
  1882. // Case[11]: remove a subset
  1883. previousEndpoints: []*v1.Endpoints{
  1884. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  1885. },
  1886. currentEndpoints: []*v1.Endpoints{
  1887. makeTestEndpoints("ns1", "ep1", namedPort),
  1888. },
  1889. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1890. makeServicePortName("ns1", "ep1", "p11"): {
  1891. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1892. },
  1893. makeServicePortName("ns1", "ep1", "p12"): {
  1894. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  1895. },
  1896. },
  1897. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1898. makeServicePortName("ns1", "ep1", "p11"): {
  1899. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1900. },
  1901. },
  1902. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1903. Endpoint: "1.1.1.2:12",
  1904. ServicePortName: makeServicePortName("ns1", "ep1", "p12"),
  1905. }},
  1906. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1907. expectedHealthchecks: map[types.NamespacedName]int{},
  1908. }, {
  1909. // Case[12]: rename a port
  1910. previousEndpoints: []*v1.Endpoints{
  1911. makeTestEndpoints("ns1", "ep1", namedPort),
  1912. },
  1913. currentEndpoints: []*v1.Endpoints{
  1914. makeTestEndpoints("ns1", "ep1", namedPortRenamed),
  1915. },
  1916. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1917. makeServicePortName("ns1", "ep1", "p11"): {
  1918. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1919. },
  1920. },
  1921. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1922. makeServicePortName("ns1", "ep1", "p11-2"): {
  1923. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1924. },
  1925. },
  1926. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1927. Endpoint: "1.1.1.1:11",
  1928. ServicePortName: makeServicePortName("ns1", "ep1", "p11"),
  1929. }},
  1930. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  1931. makeServicePortName("ns1", "ep1", "p11-2"): true,
  1932. },
  1933. expectedHealthchecks: map[types.NamespacedName]int{},
  1934. }, {
  1935. // Case[13]: renumber a port
  1936. previousEndpoints: []*v1.Endpoints{
  1937. makeTestEndpoints("ns1", "ep1", namedPort),
  1938. },
  1939. currentEndpoints: []*v1.Endpoints{
  1940. makeTestEndpoints("ns1", "ep1", namedPortRenumbered),
  1941. },
  1942. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1943. makeServicePortName("ns1", "ep1", "p11"): {
  1944. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1945. },
  1946. },
  1947. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1948. makeServicePortName("ns1", "ep1", "p11"): {
  1949. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:22", IsLocal: false}},
  1950. },
  1951. },
  1952. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  1953. Endpoint: "1.1.1.1:11",
  1954. ServicePortName: makeServicePortName("ns1", "ep1", "p11"),
  1955. }},
  1956. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  1957. expectedHealthchecks: map[types.NamespacedName]int{},
  1958. }, {
  1959. // Case[14]: complex add and remove
  1960. previousEndpoints: []*v1.Endpoints{
  1961. makeTestEndpoints("ns1", "ep1", complexBefore1),
  1962. makeTestEndpoints("ns2", "ep2", complexBefore2),
  1963. nil,
  1964. makeTestEndpoints("ns4", "ep4", complexBefore4),
  1965. },
  1966. currentEndpoints: []*v1.Endpoints{
  1967. makeTestEndpoints("ns1", "ep1", complexAfter1),
  1968. nil,
  1969. makeTestEndpoints("ns3", "ep3", complexAfter3),
  1970. makeTestEndpoints("ns4", "ep4", complexAfter4),
  1971. },
  1972. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
  1973. makeServicePortName("ns1", "ep1", "p11"): {
  1974. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1975. },
  1976. makeServicePortName("ns2", "ep2", "p22"): {
  1977. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.2:22", IsLocal: true}},
  1978. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.22:22", IsLocal: true}},
  1979. },
  1980. makeServicePortName("ns2", "ep2", "p23"): {
  1981. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "2.2.2.3:23", IsLocal: true}},
  1982. },
  1983. makeServicePortName("ns4", "ep4", "p44"): {
  1984. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.4:44", IsLocal: true}},
  1985. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.5:44", IsLocal: true}},
  1986. },
  1987. makeServicePortName("ns4", "ep4", "p45"): {
  1988. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.6:45", IsLocal: true}},
  1989. },
  1990. },
  1991. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  1992. makeServicePortName("ns1", "ep1", "p11"): {
  1993. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  1994. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.11:11", IsLocal: false}},
  1995. },
  1996. makeServicePortName("ns1", "ep1", "p12"): {
  1997. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:12", IsLocal: false}},
  1998. },
  1999. makeServicePortName("ns1", "ep1", "p122"): {
  2000. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.2:122", IsLocal: false}},
  2001. },
  2002. makeServicePortName("ns3", "ep3", "p33"): {
  2003. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "3.3.3.3:33", IsLocal: false}},
  2004. },
  2005. makeServicePortName("ns4", "ep4", "p44"): {
  2006. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "4.4.4.4:44", IsLocal: true}},
  2007. },
  2008. },
  2009. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2010. Endpoint: "2.2.2.2:22",
  2011. ServicePortName: makeServicePortName("ns2", "ep2", "p22"),
  2012. }, {
  2013. Endpoint: "2.2.2.22:22",
  2014. ServicePortName: makeServicePortName("ns2", "ep2", "p22"),
  2015. }, {
  2016. Endpoint: "2.2.2.3:23",
  2017. ServicePortName: makeServicePortName("ns2", "ep2", "p23"),
  2018. }, {
  2019. Endpoint: "4.4.4.5:44",
  2020. ServicePortName: makeServicePortName("ns4", "ep4", "p44"),
  2021. }, {
  2022. Endpoint: "4.4.4.6:45",
  2023. ServicePortName: makeServicePortName("ns4", "ep4", "p45"),
  2024. }},
  2025. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2026. makeServicePortName("ns1", "ep1", "p12"): true,
  2027. makeServicePortName("ns1", "ep1", "p122"): true,
  2028. makeServicePortName("ns3", "ep3", "p33"): true,
  2029. },
  2030. expectedHealthchecks: map[types.NamespacedName]int{
  2031. makeNSN("ns4", "ep4"): 1,
  2032. },
  2033. }, {
  2034. // Case[15]: change from 0 endpoint address to 1 unnamed port
  2035. previousEndpoints: []*v1.Endpoints{
  2036. makeTestEndpoints("ns1", "ep1", emptyEndpoint),
  2037. },
  2038. currentEndpoints: []*v1.Endpoints{
  2039. makeTestEndpoints("ns1", "ep1", unnamedPort),
  2040. },
  2041. oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
  2042. expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
  2043. makeServicePortName("ns1", "ep1", ""): {
  2044. {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "1.1.1.1:11", IsLocal: false}},
  2045. },
  2046. },
  2047. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2048. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2049. makeServicePortName("ns1", "ep1", ""): true,
  2050. },
  2051. expectedHealthchecks: map[types.NamespacedName]int{},
  2052. },
  2053. }
  2054. for tci, tc := range testCases {
  2055. ipt := iptablestest.NewFake()
  2056. fp := NewFakeProxier(ipt)
  2057. fp.hostname = nodeName
  2058. // First check that after adding all previous versions of endpoints,
  2059. // the fp.oldEndpoints is as we expect.
  2060. for i := range tc.previousEndpoints {
  2061. if tc.previousEndpoints[i] != nil {
  2062. fp.OnEndpointsAdd(tc.previousEndpoints[i])
  2063. }
  2064. }
  2065. fp.endpointsMap.Update(fp.endpointsChanges)
  2066. compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
  2067. // Now let's call appropriate handlers to get to state we want to be.
  2068. if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
  2069. t.Fatalf("[%d] different lengths of previous and current endpoints", tci)
  2070. continue
  2071. }
  2072. for i := range tc.previousEndpoints {
  2073. prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i]
  2074. switch {
  2075. case prev == nil:
  2076. fp.OnEndpointsAdd(curr)
  2077. case curr == nil:
  2078. fp.OnEndpointsDelete(prev)
  2079. default:
  2080. fp.OnEndpointsUpdate(prev, curr)
  2081. }
  2082. }
  2083. result := fp.endpointsMap.Update(fp.endpointsChanges)
  2084. newMap := fp.endpointsMap
  2085. compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
  2086. if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
  2087. t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
  2088. }
  2089. for _, x := range tc.expectedStaleEndpoints {
  2090. found := false
  2091. for _, stale := range result.StaleEndpoints {
  2092. if stale == x {
  2093. found = true
  2094. break
  2095. }
  2096. }
  2097. if !found {
  2098. t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints)
  2099. }
  2100. }
  2101. if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) {
  2102. t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames)
  2103. }
  2104. for svcName := range tc.expectedStaleServiceNames {
  2105. found := false
  2106. for _, stale := range result.StaleServiceNames {
  2107. if stale == svcName {
  2108. found = true
  2109. }
  2110. }
  2111. if !found {
  2112. t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
  2113. }
  2114. }
  2115. if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
  2116. t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
  2117. }
  2118. }
  2119. }
  2120. // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.