proxier_test.go 114 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ipvs
  14. import (
  15. "bytes"
  16. "fmt"
  17. "net"
  18. "reflect"
  19. "sort"
  20. "strings"
  21. "testing"
  22. "time"
  23. "github.com/stretchr/testify/assert"
  24. v1 "k8s.io/api/core/v1"
  25. discovery "k8s.io/api/discovery/v1beta1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/types"
  28. "k8s.io/apimachinery/pkg/util/intstr"
  29. "k8s.io/apimachinery/pkg/util/sets"
  30. "k8s.io/kubernetes/pkg/proxy"
  31. "k8s.io/kubernetes/pkg/proxy/healthcheck"
  32. netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
  33. utilproxy "k8s.io/kubernetes/pkg/proxy/util"
  34. proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
  35. proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
  36. "k8s.io/kubernetes/pkg/util/async"
  37. utilipset "k8s.io/kubernetes/pkg/util/ipset"
  38. ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing"
  39. utiliptables "k8s.io/kubernetes/pkg/util/iptables"
  40. iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
  41. utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
  42. ipvstest "k8s.io/kubernetes/pkg/util/ipvs/testing"
  43. "k8s.io/utils/exec"
  44. fakeexec "k8s.io/utils/exec/testing"
  45. utilpointer "k8s.io/utils/pointer"
  46. )
  47. const testHostname = "test-hostname"
  48. type fakeIPGetter struct {
  49. nodeIPs []net.IP
  50. }
  51. func (f *fakeIPGetter) NodeIPs() ([]net.IP, error) {
  52. return f.nodeIPs, nil
  53. }
  54. // fakePortOpener implements portOpener.
  55. type fakePortOpener struct {
  56. openPorts []*utilproxy.LocalPort
  57. }
  58. // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
  59. // to lock a local port.
  60. func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
  61. f.openPorts = append(f.openPorts, lp)
  62. return nil, nil
  63. }
  64. // fakeKernelHandler implements KernelHandler.
  65. type fakeKernelHandler struct {
  66. modules []string
  67. kernelVersion string
  68. }
  69. func (fake *fakeKernelHandler) GetModules() ([]string, error) {
  70. return fake.modules, nil
  71. }
  72. func (fake *fakeKernelHandler) GetKernelVersion() (string, error) {
  73. return fake.kernelVersion, nil
  74. }
  75. // fakeKernelHandler implements KernelHandler.
  76. type fakeIPSetVersioner struct {
  77. version string
  78. err error
  79. }
  80. func (fake *fakeIPSetVersioner) GetVersion() (string, error) {
  81. return fake.version, fake.err
  82. }
  83. func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP, excludeCIDRs []*net.IPNet, endpointSlicesEnabled bool) *Proxier {
  84. fcmd := fakeexec.FakeCmd{
  85. CombinedOutputScript: []fakeexec.FakeAction{
  86. func() ([]byte, []byte, error) { return []byte("dummy device have been created"), nil, nil },
  87. func() ([]byte, []byte, error) { return []byte(""), nil, nil },
  88. },
  89. }
  90. fexec := &fakeexec.FakeExec{
  91. CommandScript: []fakeexec.FakeCommandAction{
  92. func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
  93. func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
  94. },
  95. LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
  96. }
  97. // initialize ipsetList with all sets we needed
  98. ipsetList := make(map[string]*IPSet)
  99. for _, is := range ipsetInfo {
  100. ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment)
  101. }
  102. p := &Proxier{
  103. exec: fexec,
  104. serviceMap: make(proxy.ServiceMap),
  105. serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
  106. endpointsMap: make(proxy.EndpointsMap),
  107. endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil, endpointSlicesEnabled),
  108. excludeCIDRs: excludeCIDRs,
  109. iptables: ipt,
  110. ipvs: ipvs,
  111. ipset: ipset,
  112. strictARP: false,
  113. localDetector: proxyutiliptables.NewNoOpLocalDetector(),
  114. hostname: testHostname,
  115. portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
  116. portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
  117. serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
  118. ipvsScheduler: DefaultScheduler,
  119. ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
  120. iptablesData: bytes.NewBuffer(nil),
  121. filterChainsData: bytes.NewBuffer(nil),
  122. natChains: bytes.NewBuffer(nil),
  123. natRules: bytes.NewBuffer(nil),
  124. filterChains: bytes.NewBuffer(nil),
  125. filterRules: bytes.NewBuffer(nil),
  126. netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
  127. ipsetList: ipsetList,
  128. nodePortAddresses: make([]string, 0),
  129. networkInterfacer: proxyutiltest.NewFakeNetwork(),
  130. gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
  131. }
  132. p.setInitialized(true)
  133. p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
  134. return p
  135. }
  136. func makeNSN(namespace, name string) types.NamespacedName {
  137. return types.NamespacedName{Namespace: namespace, Name: name}
  138. }
  139. func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
  140. for i := range allServices {
  141. proxier.OnServiceAdd(allServices[i])
  142. }
  143. proxier.mu.Lock()
  144. defer proxier.mu.Unlock()
  145. proxier.servicesSynced = true
  146. }
  147. func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
  148. svc := &v1.Service{
  149. ObjectMeta: metav1.ObjectMeta{
  150. Name: name,
  151. Namespace: namespace,
  152. Annotations: map[string]string{},
  153. },
  154. Spec: v1.ServiceSpec{},
  155. Status: v1.ServiceStatus{},
  156. }
  157. svcFunc(svc)
  158. return svc
  159. }
  160. func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
  161. for i := range allEndpoints {
  162. proxier.OnEndpointsAdd(allEndpoints[i])
  163. }
  164. proxier.mu.Lock()
  165. defer proxier.mu.Unlock()
  166. proxier.endpointsSynced = true
  167. }
  168. func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
  169. ept := &v1.Endpoints{
  170. ObjectMeta: metav1.ObjectMeta{
  171. Name: name,
  172. Namespace: namespace,
  173. },
  174. }
  175. eptFunc(ept)
  176. return ept
  177. }
  178. func TestCleanupLeftovers(t *testing.T) {
  179. ipt := iptablestest.NewFake()
  180. ipvs := ipvstest.NewFake()
  181. ipset := ipsettest.NewFake(testIPSetVersion)
  182. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  183. svcIP := "10.20.30.41"
  184. svcPort := 80
  185. svcNodePort := 3001
  186. svcPortName := proxy.ServicePortName{
  187. NamespacedName: makeNSN("ns1", "svc1"),
  188. Port: "p80",
  189. }
  190. makeServiceMap(fp,
  191. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  192. svc.Spec.Type = "NodePort"
  193. svc.Spec.ClusterIP = svcIP
  194. svc.Spec.Ports = []v1.ServicePort{{
  195. Name: svcPortName.Port,
  196. Port: int32(svcPort),
  197. Protocol: v1.ProtocolTCP,
  198. NodePort: int32(svcNodePort),
  199. }}
  200. }),
  201. )
  202. epIP := "10.180.0.1"
  203. makeEndpointsMap(fp,
  204. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  205. ept.Subsets = []v1.EndpointSubset{{
  206. Addresses: []v1.EndpointAddress{{
  207. IP: epIP,
  208. }},
  209. Ports: []v1.EndpointPort{{
  210. Name: svcPortName.Port,
  211. Port: int32(svcPort),
  212. }},
  213. }}
  214. }),
  215. )
  216. fp.syncProxyRules()
  217. // test cleanup left over
  218. if CleanupLeftovers(ipvs, ipt, ipset, true) {
  219. t.Errorf("Cleanup leftovers failed")
  220. }
  221. }
  222. func TestCanUseIPVSProxier(t *testing.T) {
  223. testCases := []struct {
  224. mods []string
  225. kernelVersion string
  226. kernelErr error
  227. ipsetVersion string
  228. ipsetErr error
  229. ok bool
  230. }{
  231. // case 0, kernel error
  232. {
  233. mods: []string{"foo", "bar", "baz"},
  234. kernelVersion: "4.19",
  235. kernelErr: fmt.Errorf("oops"),
  236. ipsetVersion: "0.0",
  237. ok: false,
  238. },
  239. // case 1, ipset error
  240. {
  241. mods: []string{"foo", "bar", "baz"},
  242. kernelVersion: "4.19",
  243. ipsetVersion: MinIPSetCheckVersion,
  244. ipsetErr: fmt.Errorf("oops"),
  245. ok: false,
  246. },
  247. // case 2, missing required kernel modules and ipset version too low
  248. {
  249. mods: []string{"foo", "bar", "baz"},
  250. kernelVersion: "4.19",
  251. ipsetVersion: "1.1",
  252. ok: false,
  253. },
  254. // case 3, missing required ip_vs_* kernel modules
  255. {
  256. mods: []string{"ip_vs", "a", "bc", "def"},
  257. kernelVersion: "4.19",
  258. ipsetVersion: MinIPSetCheckVersion,
  259. ok: false,
  260. },
  261. // case 4, ipset version too low
  262. {
  263. mods: []string{"ip_vs", "ip_vs_rr", "ip_vs_wrr", "ip_vs_sh", "nf_conntrack"},
  264. kernelVersion: "4.19",
  265. ipsetVersion: "4.3.0",
  266. ok: false,
  267. },
  268. // case 5, ok for linux kernel 4.19
  269. {
  270. mods: []string{"ip_vs", "ip_vs_rr", "ip_vs_wrr", "ip_vs_sh", "nf_conntrack"},
  271. kernelVersion: "4.19",
  272. ipsetVersion: MinIPSetCheckVersion,
  273. ok: true,
  274. },
  275. // case 6, ok for linux kernel 4.18
  276. {
  277. mods: []string{"ip_vs", "ip_vs_rr", "ip_vs_wrr", "ip_vs_sh", "nf_conntrack_ipv4"},
  278. kernelVersion: "4.18",
  279. ipsetVersion: MinIPSetCheckVersion,
  280. ok: true,
  281. },
  282. // case 7. ok when module list has extra modules
  283. {
  284. mods: []string{"foo", "ip_vs", "ip_vs_rr", "ip_vs_wrr", "ip_vs_sh", "nf_conntrack", "bar"},
  285. kernelVersion: "4.19",
  286. ipsetVersion: "6.19",
  287. ok: true,
  288. },
  289. }
  290. for i := range testCases {
  291. handle := &fakeKernelHandler{modules: testCases[i].mods, kernelVersion: testCases[i].kernelVersion}
  292. versioner := &fakeIPSetVersioner{version: testCases[i].ipsetVersion, err: testCases[i].ipsetErr}
  293. ok, err := CanUseIPVSProxier(handle, versioner)
  294. if ok != testCases[i].ok {
  295. t.Errorf("Case [%d], expect %v, got %v: err: %v", i, testCases[i].ok, ok, err)
  296. }
  297. }
  298. }
  299. func TestGetNodeIPs(t *testing.T) {
  300. testCases := []struct {
  301. devAddresses map[string][]string
  302. expectIPs []string
  303. }{
  304. // case 0
  305. {
  306. devAddresses: map[string][]string{"eth0": {"1.2.3.4"}, "lo": {"127.0.0.1"}},
  307. expectIPs: []string{"1.2.3.4", "127.0.0.1"},
  308. },
  309. // case 1
  310. {
  311. devAddresses: map[string][]string{"lo": {"127.0.0.1"}},
  312. expectIPs: []string{"127.0.0.1"},
  313. },
  314. // case 2
  315. {
  316. devAddresses: map[string][]string{},
  317. expectIPs: []string{},
  318. },
  319. // case 3
  320. {
  321. devAddresses: map[string][]string{"encap0": {"10.20.30.40"}, "lo": {"127.0.0.1"}, "docker0": {"172.17.0.1"}},
  322. expectIPs: []string{"10.20.30.40", "127.0.0.1", "172.17.0.1"},
  323. },
  324. // case 4
  325. {
  326. devAddresses: map[string][]string{"encaps9": {"10.20.30.40"}, "lo": {"127.0.0.1"}, "encap7": {"10.20.30.31"}},
  327. expectIPs: []string{"10.20.30.40", "127.0.0.1", "10.20.30.31"},
  328. },
  329. // case 5
  330. {
  331. devAddresses: map[string][]string{"kube-ipvs0": {"1.2.3.4"}, "lo": {"127.0.0.1"}, "encap7": {"10.20.30.31"}},
  332. expectIPs: []string{"127.0.0.1", "10.20.30.31"},
  333. },
  334. // case 6
  335. {
  336. devAddresses: map[string][]string{"kube-ipvs0": {"1.2.3.4", "2.3.4.5"}, "lo": {"127.0.0.1"}},
  337. expectIPs: []string{"127.0.0.1"},
  338. },
  339. // case 7
  340. {
  341. devAddresses: map[string][]string{"kube-ipvs0": {"1.2.3.4", "2.3.4.5"}},
  342. expectIPs: []string{},
  343. },
  344. // case 8
  345. {
  346. devAddresses: map[string][]string{"kube-ipvs0": {"1.2.3.4", "2.3.4.5"}, "eth5": {"3.4.5.6"}, "lo": {"127.0.0.1"}},
  347. expectIPs: []string{"127.0.0.1", "3.4.5.6"},
  348. },
  349. // case 9
  350. {
  351. devAddresses: map[string][]string{"ipvs0": {"1.2.3.4"}, "lo": {"127.0.0.1"}, "encap7": {"10.20.30.31"}},
  352. expectIPs: []string{"127.0.0.1", "10.20.30.31", "1.2.3.4"},
  353. },
  354. }
  355. for i := range testCases {
  356. fake := netlinktest.NewFakeNetlinkHandle()
  357. for dev, addresses := range testCases[i].devAddresses {
  358. fake.SetLocalAddresses(dev, addresses...)
  359. }
  360. r := realIPGetter{nl: fake}
  361. ips, err := r.NodeIPs()
  362. if err != nil {
  363. t.Errorf("Unexpected error: %v", err)
  364. }
  365. ipStrs := sets.NewString()
  366. for _, ip := range ips {
  367. ipStrs.Insert(ip.String())
  368. }
  369. if !ipStrs.Equal(sets.NewString(testCases[i].expectIPs...)) {
  370. t.Errorf("case[%d], unexpected mismatch, expected: %v, got: %v", i, testCases[i].expectIPs, ips)
  371. }
  372. }
  373. }
  374. func TestNodePort(t *testing.T) {
  375. tests := []struct {
  376. name string
  377. services []*v1.Service
  378. endpoints []*v1.Endpoints
  379. nodeIPs []net.IP
  380. nodePortAddresses []string
  381. expectedIPVS *ipvstest.FakeIPVS
  382. expectedIPSets netlinktest.ExpectedIPSet
  383. expectedIptablesChains netlinktest.ExpectedIptablesChain
  384. }{
  385. {
  386. name: "1 service with node port, has 2 endpoints",
  387. services: []*v1.Service{
  388. makeTestService("ns1", "svc1", func(svc *v1.Service) {
  389. svc.Spec.Type = "NodePort"
  390. svc.Spec.ClusterIP = "10.20.30.41"
  391. svc.Spec.Ports = []v1.ServicePort{{
  392. Name: "p80",
  393. Port: int32(80),
  394. Protocol: v1.ProtocolTCP,
  395. NodePort: int32(3001),
  396. }}
  397. }),
  398. },
  399. endpoints: []*v1.Endpoints{
  400. makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) {
  401. ept.Subsets = []v1.EndpointSubset{{
  402. Addresses: []v1.EndpointAddress{{
  403. IP: "10.180.0.1",
  404. }, {
  405. IP: "1002:ab8::2:10",
  406. }},
  407. Ports: []v1.EndpointPort{{
  408. Name: "p80",
  409. Port: int32(80),
  410. Protocol: v1.ProtocolTCP,
  411. }},
  412. }}
  413. }),
  414. },
  415. nodeIPs: []net.IP{
  416. net.ParseIP("100.101.102.103"),
  417. net.ParseIP("2001:db8::1:1"),
  418. },
  419. nodePortAddresses: []string{},
  420. expectedIPVS: &ipvstest.FakeIPVS{
  421. Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
  422. {
  423. IP: "10.20.30.41",
  424. Port: 80,
  425. Protocol: "TCP",
  426. }: {
  427. Address: net.ParseIP("10.20.30.41"),
  428. Protocol: "TCP",
  429. Port: uint16(80),
  430. Scheduler: "rr",
  431. },
  432. {
  433. IP: "100.101.102.103",
  434. Port: 3001,
  435. Protocol: "TCP",
  436. }: {
  437. Address: net.ParseIP("100.101.102.103"),
  438. Protocol: "TCP",
  439. Port: uint16(3001),
  440. Scheduler: "rr",
  441. },
  442. {
  443. IP: "2001:db8::1:1",
  444. Port: 3001,
  445. Protocol: "TCP",
  446. }: {
  447. Address: net.ParseIP("2001:db8::1:1"),
  448. Protocol: "TCP",
  449. Port: uint16(3001),
  450. Scheduler: "rr",
  451. },
  452. },
  453. Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
  454. {
  455. IP: "10.20.30.41",
  456. Port: 80,
  457. Protocol: "TCP",
  458. }: {
  459. {
  460. Address: net.ParseIP("10.180.0.1"),
  461. Port: uint16(80),
  462. Weight: 1,
  463. },
  464. {
  465. Address: net.ParseIP("1002:ab8::2:10"),
  466. Port: uint16(80),
  467. Weight: 1,
  468. },
  469. },
  470. {
  471. IP: "100.101.102.103",
  472. Port: 3001,
  473. Protocol: "TCP",
  474. }: {
  475. {
  476. Address: net.ParseIP("10.180.0.1"),
  477. Port: uint16(80),
  478. Weight: 1,
  479. },
  480. {
  481. Address: net.ParseIP("1002:ab8::2:10"),
  482. Port: uint16(80),
  483. Weight: 1,
  484. },
  485. },
  486. {
  487. IP: "2001:db8::1:1",
  488. Port: 3001,
  489. Protocol: "TCP",
  490. }: {
  491. {
  492. Address: net.ParseIP("10.180.0.1"),
  493. Port: uint16(80),
  494. Weight: 1,
  495. },
  496. {
  497. Address: net.ParseIP("1002:ab8::2:10"),
  498. Port: uint16(80),
  499. Weight: 1,
  500. },
  501. },
  502. },
  503. },
  504. },
  505. {
  506. name: "1 UDP service with node port, has endpoints",
  507. services: []*v1.Service{
  508. makeTestService("ns1", "svc1", func(svc *v1.Service) {
  509. svc.Spec.Type = "NodePort"
  510. svc.Spec.ClusterIP = "10.20.30.41"
  511. svc.Spec.Ports = []v1.ServicePort{{
  512. Name: "p80",
  513. Port: int32(80),
  514. Protocol: v1.ProtocolUDP,
  515. NodePort: int32(3001),
  516. }}
  517. }),
  518. },
  519. endpoints: []*v1.Endpoints{
  520. makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) {
  521. ept.Subsets = []v1.EndpointSubset{{
  522. Addresses: []v1.EndpointAddress{{
  523. IP: "10.180.0.1",
  524. }},
  525. Ports: []v1.EndpointPort{{
  526. Name: "p80",
  527. Port: int32(80),
  528. Protocol: v1.ProtocolUDP,
  529. }},
  530. }}
  531. }),
  532. },
  533. nodeIPs: []net.IP{
  534. net.ParseIP("100.101.102.103"),
  535. },
  536. nodePortAddresses: []string{"0.0.0.0/0"},
  537. expectedIPVS: &ipvstest.FakeIPVS{
  538. Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
  539. {
  540. IP: "10.20.30.41",
  541. Port: 80,
  542. Protocol: "UDP",
  543. }: {
  544. Address: net.ParseIP("10.20.30.41"),
  545. Protocol: "UDP",
  546. Port: uint16(80),
  547. Scheduler: "rr",
  548. },
  549. {
  550. IP: "100.101.102.103",
  551. Port: 3001,
  552. Protocol: "UDP",
  553. }: {
  554. Address: net.ParseIP("100.101.102.103"),
  555. Protocol: "UDP",
  556. Port: uint16(3001),
  557. Scheduler: "rr",
  558. },
  559. },
  560. Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
  561. {
  562. IP: "10.20.30.41",
  563. Port: 80,
  564. Protocol: "UDP",
  565. }: {
  566. {
  567. Address: net.ParseIP("10.180.0.1"),
  568. Port: uint16(80),
  569. Weight: 1,
  570. },
  571. },
  572. {
  573. IP: "100.101.102.103",
  574. Port: 3001,
  575. Protocol: "UDP",
  576. }: {
  577. {
  578. Address: net.ParseIP("10.180.0.1"),
  579. Port: uint16(80),
  580. Weight: 1,
  581. },
  582. },
  583. },
  584. },
  585. expectedIPSets: netlinktest.ExpectedIPSet{
  586. kubeNodePortSetUDP: {{
  587. Port: 3001,
  588. Protocol: strings.ToLower(string(v1.ProtocolUDP)),
  589. SetType: utilipset.BitmapPort,
  590. }},
  591. },
  592. expectedIptablesChains: netlinktest.ExpectedIptablesChain{
  593. string(KubeNodePortChain): {{
  594. JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetUDP,
  595. }},
  596. string(kubeServicesChain): {{
  597. JumpChain: string(KubeNodePortChain), MatchSet: "",
  598. }},
  599. },
  600. },
  601. {
  602. name: "service has node port but no endpoints",
  603. services: []*v1.Service{
  604. makeTestService("ns1", "svc1", func(svc *v1.Service) {
  605. svc.Spec.Type = "NodePort"
  606. svc.Spec.ClusterIP = "10.20.30.41"
  607. svc.Spec.Ports = []v1.ServicePort{{
  608. Name: "p80",
  609. Port: int32(80),
  610. Protocol: v1.ProtocolTCP,
  611. NodePort: int32(3001),
  612. }}
  613. }),
  614. },
  615. endpoints: []*v1.Endpoints{},
  616. nodeIPs: []net.IP{
  617. net.ParseIP("100.101.102.103"),
  618. },
  619. nodePortAddresses: []string{},
  620. expectedIPVS: &ipvstest.FakeIPVS{
  621. Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
  622. {
  623. IP: "10.20.30.41",
  624. Port: 80,
  625. Protocol: "TCP",
  626. }: {
  627. Address: net.ParseIP("10.20.30.41"),
  628. Protocol: "TCP",
  629. Port: uint16(80),
  630. Scheduler: "rr",
  631. },
  632. {
  633. IP: "100.101.102.103",
  634. Port: 3001,
  635. Protocol: "TCP",
  636. }: {
  637. Address: net.ParseIP("100.101.102.103"),
  638. Protocol: "TCP",
  639. Port: uint16(3001),
  640. Scheduler: "rr",
  641. },
  642. },
  643. Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
  644. {
  645. IP: "10.20.30.41",
  646. Port: 80,
  647. Protocol: "TCP",
  648. }: {}, // no real servers corresponding to no endpoints
  649. {
  650. IP: "100.101.102.103",
  651. Port: 3001,
  652. Protocol: "TCP",
  653. }: {}, // no real servers corresponding to no endpoints
  654. },
  655. },
  656. },
  657. {
  658. name: "node port service with protocol sctp on a node with multiple nodeIPs",
  659. services: []*v1.Service{
  660. makeTestService("ns1", "svc1", func(svc *v1.Service) {
  661. svc.Spec.Type = "NodePort"
  662. svc.Spec.ClusterIP = "10.20.30.41"
  663. svc.Spec.Ports = []v1.ServicePort{{
  664. Name: "p80",
  665. Port: int32(80),
  666. Protocol: v1.ProtocolSCTP,
  667. NodePort: int32(3001),
  668. }}
  669. }),
  670. },
  671. endpoints: []*v1.Endpoints{
  672. makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) {
  673. ept.Subsets = []v1.EndpointSubset{{
  674. Addresses: []v1.EndpointAddress{{
  675. IP: "10.180.0.1",
  676. }},
  677. Ports: []v1.EndpointPort{{
  678. Name: "p80",
  679. Port: int32(80),
  680. Protocol: v1.ProtocolSCTP,
  681. }},
  682. }}
  683. }),
  684. },
  685. nodeIPs: []net.IP{
  686. net.ParseIP("100.101.102.103"),
  687. net.ParseIP("100.101.102.104"),
  688. net.ParseIP("100.101.102.105"),
  689. net.ParseIP("2001:db8::1:1"),
  690. net.ParseIP("2001:db8::1:2"),
  691. net.ParseIP("2001:db8::1:3"),
  692. },
  693. nodePortAddresses: []string{},
  694. expectedIPVS: &ipvstest.FakeIPVS{
  695. Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
  696. {
  697. IP: "10.20.30.41",
  698. Port: 80,
  699. Protocol: "SCTP",
  700. }: {
  701. Address: net.ParseIP("10.20.30.41"),
  702. Protocol: "SCTP",
  703. Port: uint16(80),
  704. Scheduler: "rr",
  705. },
  706. {
  707. IP: "100.101.102.103",
  708. Port: 3001,
  709. Protocol: "SCTP",
  710. }: {
  711. Address: net.ParseIP("100.101.102.103"),
  712. Protocol: "SCTP",
  713. Port: uint16(3001),
  714. Scheduler: "rr",
  715. },
  716. {
  717. IP: "100.101.102.104",
  718. Port: 3001,
  719. Protocol: "SCTP",
  720. }: {
  721. Address: net.ParseIP("100.101.102.104"),
  722. Protocol: "SCTP",
  723. Port: uint16(3001),
  724. Scheduler: "rr",
  725. },
  726. {
  727. IP: "100.101.102.105",
  728. Port: 3001,
  729. Protocol: "SCTP",
  730. }: {
  731. Address: net.ParseIP("100.101.102.105"),
  732. Protocol: "SCTP",
  733. Port: uint16(3001),
  734. Scheduler: "rr",
  735. },
  736. {
  737. IP: "2001:db8::1:1",
  738. Port: 3001,
  739. Protocol: "SCTP",
  740. }: {
  741. Address: net.ParseIP("2001:db8::1:1"),
  742. Protocol: "SCTP",
  743. Port: uint16(3001),
  744. Scheduler: "rr",
  745. },
  746. {
  747. IP: "2001:db8::1:2",
  748. Port: 3001,
  749. Protocol: "SCTP",
  750. }: {
  751. Address: net.ParseIP("2001:db8::1:2"),
  752. Protocol: "SCTP",
  753. Port: uint16(3001),
  754. Scheduler: "rr",
  755. },
  756. {
  757. IP: "2001:db8::1:3",
  758. Port: 3001,
  759. Protocol: "SCTP",
  760. }: {
  761. Address: net.ParseIP("2001:db8::1:3"),
  762. Protocol: "SCTP",
  763. Port: uint16(3001),
  764. Scheduler: "rr",
  765. },
  766. },
  767. Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
  768. {
  769. IP: "10.20.30.41",
  770. Port: 80,
  771. Protocol: "SCTP",
  772. }: {
  773. {
  774. Address: net.ParseIP("10.180.0.1"),
  775. Port: uint16(80),
  776. Weight: 1,
  777. },
  778. },
  779. {
  780. IP: "100.101.102.103",
  781. Port: 3001,
  782. Protocol: "SCTP",
  783. }: {
  784. {
  785. Address: net.ParseIP("10.180.0.1"),
  786. Port: uint16(80),
  787. Weight: 1,
  788. },
  789. },
  790. {
  791. IP: "100.101.102.104",
  792. Port: 3001,
  793. Protocol: "SCTP",
  794. }: {
  795. {
  796. Address: net.ParseIP("10.180.0.1"),
  797. Port: uint16(80),
  798. Weight: 1,
  799. },
  800. },
  801. {
  802. IP: "100.101.102.105",
  803. Port: 3001,
  804. Protocol: "SCTP",
  805. }: {
  806. {
  807. Address: net.ParseIP("10.180.0.1"),
  808. Port: uint16(80),
  809. Weight: 1,
  810. },
  811. },
  812. {
  813. IP: "2001:db8::1:1",
  814. Port: 3001,
  815. Protocol: "SCTP",
  816. }: {
  817. {
  818. Address: net.ParseIP("10.180.0.1"),
  819. Port: uint16(80),
  820. Weight: 1,
  821. },
  822. },
  823. {
  824. IP: "2001:db8::1:2",
  825. Port: 3001,
  826. Protocol: "SCTP",
  827. }: {
  828. {
  829. Address: net.ParseIP("10.180.0.1"),
  830. Port: uint16(80),
  831. Weight: 1,
  832. },
  833. },
  834. {
  835. IP: "2001:db8::1:3",
  836. Port: 3001,
  837. Protocol: "SCTP",
  838. }: {
  839. {
  840. Address: net.ParseIP("10.180.0.1"),
  841. Port: uint16(80),
  842. Weight: 1,
  843. },
  844. },
  845. },
  846. },
  847. expectedIPSets: netlinktest.ExpectedIPSet{
  848. kubeNodePortSetSCTP: {
  849. {
  850. IP: "100.101.102.103",
  851. Port: 3001,
  852. Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
  853. SetType: utilipset.HashIPPort,
  854. },
  855. {
  856. IP: "100.101.102.104",
  857. Port: 3001,
  858. Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
  859. SetType: utilipset.HashIPPort,
  860. },
  861. {
  862. IP: "100.101.102.105",
  863. Port: 3001,
  864. Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
  865. SetType: utilipset.HashIPPort,
  866. },
  867. {
  868. IP: "2001:db8::1:1",
  869. Port: 3001,
  870. Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
  871. SetType: utilipset.HashIPPort,
  872. },
  873. {
  874. IP: "2001:db8::1:2",
  875. Port: 3001,
  876. Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
  877. SetType: utilipset.HashIPPort,
  878. },
  879. {
  880. IP: "2001:db8::1:3",
  881. Port: 3001,
  882. Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
  883. SetType: utilipset.HashIPPort,
  884. },
  885. },
  886. },
  887. },
  888. }
  889. for _, test := range tests {
  890. t.Run(test.name, func(t *testing.T) {
  891. ipt := iptablestest.NewFake()
  892. ipvs := ipvstest.NewFake()
  893. ipset := ipsettest.NewFake(testIPSetVersion)
  894. fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, false)
  895. fp.nodePortAddresses = test.nodePortAddresses
  896. makeServiceMap(fp, test.services...)
  897. makeEndpointsMap(fp, test.endpoints...)
  898. fp.syncProxyRules()
  899. if !reflect.DeepEqual(ipvs, test.expectedIPVS) {
  900. t.Logf("actual ipvs state: %v", ipvs)
  901. t.Logf("expected ipvs state: %v", test.expectedIPVS)
  902. t.Errorf("unexpected IPVS state")
  903. }
  904. if test.expectedIPSets != nil {
  905. checkIPSet(t, fp, test.expectedIPSets)
  906. }
  907. if test.expectedIptablesChains != nil {
  908. checkIptables(t, ipt, test.expectedIptablesChains)
  909. }
  910. })
  911. }
  912. }
  913. func TestClusterIP(t *testing.T) {
  914. tests := []struct {
  915. name string
  916. services []*v1.Service
  917. endpoints []*v1.Endpoints
  918. expectedIPVS *ipvstest.FakeIPVS
  919. }{
  920. {
  921. name: "2 services with Cluster IP, each with endpoints",
  922. services: []*v1.Service{
  923. makeTestService("ns1", "svc1", func(svc *v1.Service) {
  924. svc.Spec.ClusterIP = "10.20.30.41"
  925. svc.Spec.Ports = []v1.ServicePort{{
  926. Name: "p80",
  927. Port: int32(80),
  928. Protocol: v1.ProtocolTCP,
  929. }}
  930. }),
  931. makeTestService("ns2", "svc2", func(svc *v1.Service) {
  932. svc.Spec.ClusterIP = "1002:ab8::2:1"
  933. svc.Spec.Ports = []v1.ServicePort{{
  934. Name: "p8080",
  935. Port: int32(8080),
  936. Protocol: v1.ProtocolTCP,
  937. }}
  938. }),
  939. },
  940. endpoints: []*v1.Endpoints{
  941. makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) {
  942. ept.Subsets = []v1.EndpointSubset{{
  943. Addresses: []v1.EndpointAddress{{
  944. IP: "10.180.0.1",
  945. }},
  946. Ports: []v1.EndpointPort{{
  947. Name: "p80",
  948. Port: int32(80),
  949. Protocol: v1.ProtocolTCP,
  950. }},
  951. }}
  952. }),
  953. makeTestEndpoints("ns2", "svc2", func(ept *v1.Endpoints) {
  954. ept.Subsets = []v1.EndpointSubset{{
  955. Addresses: []v1.EndpointAddress{{
  956. IP: "1009:ab8::5:6",
  957. }},
  958. Ports: []v1.EndpointPort{{
  959. Name: "p8080",
  960. Port: int32(8080),
  961. Protocol: v1.ProtocolTCP,
  962. }},
  963. }}
  964. }),
  965. },
  966. expectedIPVS: &ipvstest.FakeIPVS{
  967. Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
  968. {
  969. IP: "10.20.30.41",
  970. Port: 80,
  971. Protocol: "TCP",
  972. }: {
  973. Address: net.ParseIP("10.20.30.41"),
  974. Protocol: "TCP",
  975. Port: uint16(80),
  976. Scheduler: "rr",
  977. },
  978. {
  979. IP: "1002:ab8::2:1",
  980. Port: 8080,
  981. Protocol: "TCP",
  982. }: {
  983. Address: net.ParseIP("1002:ab8::2:1"),
  984. Protocol: "TCP",
  985. Port: uint16(8080),
  986. Scheduler: "rr",
  987. },
  988. },
  989. Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
  990. {
  991. IP: "10.20.30.41",
  992. Port: 80,
  993. Protocol: "TCP",
  994. }: {
  995. {
  996. Address: net.ParseIP("10.180.0.1"),
  997. Port: uint16(80),
  998. Weight: 1,
  999. },
  1000. },
  1001. {
  1002. IP: "1002:ab8::2:1",
  1003. Port: 8080,
  1004. Protocol: "TCP",
  1005. }: {
  1006. {
  1007. Address: net.ParseIP("1009:ab8::5:6"),
  1008. Port: uint16(8080),
  1009. Weight: 1,
  1010. },
  1011. },
  1012. },
  1013. },
  1014. },
  1015. {
  1016. name: "cluster IP service with no endpoints",
  1017. services: []*v1.Service{
  1018. makeTestService("ns1", "svc1", func(svc *v1.Service) {
  1019. svc.Spec.ClusterIP = "10.20.30.41"
  1020. svc.Spec.Ports = []v1.ServicePort{{
  1021. Name: "p80",
  1022. Port: int32(80),
  1023. Protocol: v1.ProtocolTCP,
  1024. }}
  1025. }),
  1026. },
  1027. endpoints: []*v1.Endpoints{},
  1028. expectedIPVS: &ipvstest.FakeIPVS{
  1029. Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
  1030. {
  1031. IP: "10.20.30.41",
  1032. Port: 80,
  1033. Protocol: "TCP",
  1034. }: {
  1035. Address: net.ParseIP("10.20.30.41"),
  1036. Protocol: "TCP",
  1037. Port: uint16(80),
  1038. Scheduler: "rr",
  1039. },
  1040. },
  1041. Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
  1042. {
  1043. IP: "10.20.30.41",
  1044. Port: 80,
  1045. Protocol: "TCP",
  1046. }: {},
  1047. },
  1048. },
  1049. },
  1050. }
  1051. for _, test := range tests {
  1052. t.Run(test.name, func(t *testing.T) {
  1053. ipt := iptablestest.NewFake()
  1054. ipvs := ipvstest.NewFake()
  1055. ipset := ipsettest.NewFake(testIPSetVersion)
  1056. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  1057. makeServiceMap(fp, test.services...)
  1058. makeEndpointsMap(fp, test.endpoints...)
  1059. fp.syncProxyRules()
  1060. if !reflect.DeepEqual(ipvs, test.expectedIPVS) {
  1061. t.Logf("actual ipvs state: %v", ipvs)
  1062. t.Logf("expected ipvs state: %v", test.expectedIPVS)
  1063. t.Errorf("unexpected IPVS state")
  1064. }
  1065. })
  1066. }
  1067. }
  1068. func TestMasqueradeRule(t *testing.T) {
  1069. for _, testcase := range []bool{false, true} {
  1070. ipt := iptablestest.NewFake().SetHasRandomFully(testcase)
  1071. ipvs := ipvstest.NewFake()
  1072. ipset := ipsettest.NewFake(testIPSetVersion)
  1073. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  1074. makeServiceMap(fp)
  1075. makeEndpointsMap(fp)
  1076. fp.syncProxyRules()
  1077. postRoutingRules := ipt.GetRules(string(kubePostroutingChain))
  1078. if !hasJump(postRoutingRules, "MASQUERADE", "") {
  1079. t.Errorf("Failed to find -j MASQUERADE in %s chain", kubePostroutingChain)
  1080. }
  1081. if hasMasqRandomFully(postRoutingRules) != testcase {
  1082. probs := map[bool]string{false: "found", true: "did not find"}
  1083. t.Errorf("%s --random-fully in -j MASQUERADE rule in %s chain for HasRandomFully()=%v", probs[testcase], kubePostroutingChain, testcase)
  1084. }
  1085. }
  1086. }
  1087. func TestExternalIPsNoEndpoint(t *testing.T) {
  1088. ipt := iptablestest.NewFake()
  1089. ipvs := ipvstest.NewFake()
  1090. ipset := ipsettest.NewFake(testIPSetVersion)
  1091. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  1092. svcIP := "10.20.30.41"
  1093. svcPort := 80
  1094. svcExternalIPs := "50.60.70.81"
  1095. svcPortName := proxy.ServicePortName{
  1096. NamespacedName: makeNSN("ns1", "svc1"),
  1097. Port: "p80",
  1098. }
  1099. makeServiceMap(fp,
  1100. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  1101. svc.Spec.Type = "ClusterIP"
  1102. svc.Spec.ClusterIP = svcIP
  1103. svc.Spec.ExternalIPs = []string{svcExternalIPs}
  1104. svc.Spec.Ports = []v1.ServicePort{{
  1105. Name: svcPortName.Port,
  1106. Port: int32(svcPort),
  1107. Protocol: v1.ProtocolTCP,
  1108. TargetPort: intstr.FromInt(svcPort),
  1109. }}
  1110. }),
  1111. )
  1112. makeEndpointsMap(fp)
  1113. fp.syncProxyRules()
  1114. // check ipvs service and destinations
  1115. services, err := ipvs.GetVirtualServers()
  1116. if err != nil {
  1117. t.Errorf("Failed to get ipvs services, err: %v", err)
  1118. }
  1119. if len(services) != 2 {
  1120. t.Errorf("Expect 2 ipvs services, got %d", len(services))
  1121. }
  1122. found := false
  1123. for _, svc := range services {
  1124. if svc.Address.String() == svcExternalIPs && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) {
  1125. found = true
  1126. destinations, _ := ipvs.GetRealServers(svc)
  1127. if len(destinations) != 0 {
  1128. t.Errorf("Unexpected %d destinations, expect 0 destinations", len(destinations))
  1129. }
  1130. break
  1131. }
  1132. }
  1133. if !found {
  1134. t.Errorf("Expect external ip type service, got none")
  1135. }
  1136. }
  1137. func TestExternalIPs(t *testing.T) {
  1138. ipt := iptablestest.NewFake()
  1139. ipvs := ipvstest.NewFake()
  1140. ipset := ipsettest.NewFake(testIPSetVersion)
  1141. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  1142. svcIP := "10.20.30.41"
  1143. svcPort := 80
  1144. svcExternalIPs := sets.NewString("50.60.70.81", "2012::51", "127.0.0.1")
  1145. svcPortName := proxy.ServicePortName{
  1146. NamespacedName: makeNSN("ns1", "svc1"),
  1147. Port: "p80",
  1148. }
  1149. makeServiceMap(fp,
  1150. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  1151. svc.Spec.Type = "ClusterIP"
  1152. svc.Spec.ClusterIP = svcIP
  1153. svc.Spec.ExternalIPs = svcExternalIPs.UnsortedList()
  1154. svc.Spec.Ports = []v1.ServicePort{{
  1155. Name: svcPortName.Port,
  1156. Port: int32(svcPort),
  1157. Protocol: v1.ProtocolTCP,
  1158. TargetPort: intstr.FromInt(svcPort),
  1159. }}
  1160. }),
  1161. )
  1162. epIP := "10.180.0.1"
  1163. makeEndpointsMap(fp,
  1164. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  1165. ept.Subsets = []v1.EndpointSubset{{
  1166. Addresses: []v1.EndpointAddress{{
  1167. IP: epIP,
  1168. }},
  1169. Ports: []v1.EndpointPort{{
  1170. Name: svcPortName.Port,
  1171. Port: int32(svcPort),
  1172. }},
  1173. }}
  1174. }),
  1175. )
  1176. fp.syncProxyRules()
  1177. // check ipvs service and destinations
  1178. services, err := ipvs.GetVirtualServers()
  1179. if err != nil {
  1180. t.Errorf("Failed to get ipvs services, err: %v", err)
  1181. }
  1182. if len(services) != 4 {
  1183. t.Errorf("Expect 4 ipvs services, got %d", len(services))
  1184. }
  1185. found := false
  1186. for _, svc := range services {
  1187. if svcExternalIPs.Has(svc.Address.String()) && svc.Port == uint16(svcPort) && svc.Protocol == string(v1.ProtocolTCP) {
  1188. found = true
  1189. destinations, _ := ipvs.GetRealServers(svc)
  1190. for _, dest := range destinations {
  1191. if dest.Address.String() != epIP || dest.Port != uint16(svcPort) {
  1192. t.Errorf("service Endpoint mismatch ipvs service destination")
  1193. }
  1194. }
  1195. break
  1196. }
  1197. }
  1198. if !found {
  1199. t.Errorf("Expect external ip type service, got none")
  1200. }
  1201. }
  1202. func TestLoadBalancer(t *testing.T) {
  1203. ipt, fp := buildFakeProxier()
  1204. svcIP := "10.20.30.41"
  1205. svcPort := 80
  1206. svcNodePort := 3001
  1207. svcLBIP := "1.2.3.4"
  1208. svcPortName := proxy.ServicePortName{
  1209. NamespacedName: makeNSN("ns1", "svc1"),
  1210. Port: "p80",
  1211. }
  1212. makeServiceMap(fp,
  1213. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  1214. svc.Spec.Type = "LoadBalancer"
  1215. svc.Spec.ClusterIP = svcIP
  1216. svc.Spec.Ports = []v1.ServicePort{{
  1217. Name: svcPortName.Port,
  1218. Port: int32(svcPort),
  1219. Protocol: v1.ProtocolTCP,
  1220. NodePort: int32(svcNodePort),
  1221. }}
  1222. svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
  1223. IP: svcLBIP,
  1224. }}
  1225. }),
  1226. )
  1227. epIP := "10.180.0.1"
  1228. makeEndpointsMap(fp,
  1229. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  1230. ept.Subsets = []v1.EndpointSubset{{
  1231. Addresses: []v1.EndpointAddress{{
  1232. IP: epIP,
  1233. }},
  1234. Ports: []v1.EndpointPort{{
  1235. Name: svcPortName.Port,
  1236. Port: int32(svcPort),
  1237. }},
  1238. }}
  1239. }),
  1240. )
  1241. fp.syncProxyRules()
  1242. // Expect 2 services and 1 destination
  1243. epVS := &netlinktest.ExpectedVirtualServer{
  1244. VSNum: 2, IP: svcLBIP, Port: uint16(svcNodePort), Protocol: string(v1.ProtocolTCP),
  1245. RS: []netlinktest.ExpectedRealServer{{
  1246. IP: epIP, Port: uint16(svcPort),
  1247. }}}
  1248. checkIPVS(t, fp, epVS)
  1249. // check ipSet rules
  1250. epIPSet := netlinktest.ExpectedIPSet{
  1251. kubeLoadBalancerSet: {{
  1252. IP: svcLBIP,
  1253. Port: svcPort,
  1254. Protocol: strings.ToLower(string(v1.ProtocolTCP)),
  1255. SetType: utilipset.HashIPPort,
  1256. }},
  1257. }
  1258. checkIPSet(t, fp, epIPSet)
  1259. // Check iptables chain and rules
  1260. epIpt := netlinktest.ExpectedIptablesChain{
  1261. string(kubeServicesChain): {{
  1262. JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
  1263. }},
  1264. string(kubeLoadBalancerSet): {{
  1265. JumpChain: string(KubeMarkMasqChain), MatchSet: "",
  1266. }},
  1267. }
  1268. checkIptables(t, ipt, epIpt)
  1269. }
  1270. func TestOnlyLocalNodePorts(t *testing.T) {
  1271. nodeIP := net.ParseIP("100.101.102.103")
  1272. ipt, fp := buildFakeProxier()
  1273. svcIP := "10.20.30.41"
  1274. svcPort := 80
  1275. svcNodePort := 3001
  1276. svcPortName := proxy.ServicePortName{
  1277. NamespacedName: makeNSN("ns1", "svc1"),
  1278. Port: "p80",
  1279. }
  1280. makeServiceMap(fp,
  1281. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  1282. svc.Spec.Type = "NodePort"
  1283. svc.Spec.ClusterIP = svcIP
  1284. svc.Spec.Ports = []v1.ServicePort{{
  1285. Name: svcPortName.Port,
  1286. Port: int32(svcPort),
  1287. Protocol: v1.ProtocolTCP,
  1288. NodePort: int32(svcNodePort),
  1289. }}
  1290. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  1291. }),
  1292. )
  1293. epIP := "10.180.0.1"
  1294. epIP1 := "10.180.1.1"
  1295. thisHostname := testHostname
  1296. otherHostname := "other-hostname"
  1297. makeEndpointsMap(fp,
  1298. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  1299. ept.Subsets = []v1.EndpointSubset{
  1300. { // **local** endpoint address, should be added as RS
  1301. Addresses: []v1.EndpointAddress{{
  1302. IP: epIP,
  1303. NodeName: &thisHostname,
  1304. }},
  1305. Ports: []v1.EndpointPort{{
  1306. Name: svcPortName.Port,
  1307. Port: int32(svcPort),
  1308. Protocol: v1.ProtocolTCP,
  1309. }}},
  1310. { // **remote** endpoint address, should not be added as RS
  1311. Addresses: []v1.EndpointAddress{{
  1312. IP: epIP1,
  1313. NodeName: &otherHostname,
  1314. }},
  1315. Ports: []v1.EndpointPort{{
  1316. Name: svcPortName.Port,
  1317. Port: int32(svcPort),
  1318. Protocol: v1.ProtocolTCP,
  1319. }},
  1320. }}
  1321. }),
  1322. )
  1323. itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0}
  1324. addrs := []net.Addr{proxyutiltest.AddrStruct{Val: "100.101.102.103/24"}}
  1325. itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
  1326. addrs1 := []net.Addr{proxyutiltest.AddrStruct{Val: "2001:db8::0/64"}}
  1327. fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
  1328. fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
  1329. fp.nodePortAddresses = []string{"100.101.102.0/24", "2001:db8::0/64"}
  1330. fp.syncProxyRules()
  1331. // Expect 3 services and 1 destination
  1332. epVS := &netlinktest.ExpectedVirtualServer{
  1333. VSNum: 3, IP: nodeIP.String(), Port: uint16(svcNodePort), Protocol: string(v1.ProtocolTCP),
  1334. RS: []netlinktest.ExpectedRealServer{{
  1335. IP: epIP, Port: uint16(svcPort),
  1336. }}}
  1337. checkIPVS(t, fp, epVS)
  1338. // check ipSet rules
  1339. epEntry := &utilipset.Entry{
  1340. Port: svcNodePort,
  1341. Protocol: strings.ToLower(string(v1.ProtocolTCP)),
  1342. SetType: utilipset.BitmapPort,
  1343. }
  1344. epIPSet := netlinktest.ExpectedIPSet{
  1345. kubeNodePortSetTCP: {epEntry},
  1346. kubeNodePortLocalSetTCP: {epEntry},
  1347. }
  1348. checkIPSet(t, fp, epIPSet)
  1349. // Check iptables chain and rules
  1350. epIpt := netlinktest.ExpectedIptablesChain{
  1351. string(kubeServicesChain): {{
  1352. JumpChain: string(KubeNodePortChain), MatchSet: "",
  1353. }},
  1354. string(KubeNodePortChain): {{
  1355. JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP,
  1356. }, {
  1357. JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP,
  1358. }},
  1359. }
  1360. checkIptables(t, ipt, epIpt)
  1361. }
  1362. func TestLoadBalanceSourceRanges(t *testing.T) {
  1363. ipt, fp := buildFakeProxier()
  1364. svcIP := "10.20.30.41"
  1365. svcPort := 80
  1366. svcLBIP := "1.2.3.4"
  1367. svcLBSource := "10.0.0.0/8"
  1368. svcPortName := proxy.ServicePortName{
  1369. NamespacedName: makeNSN("ns1", "svc1"),
  1370. Port: "p80",
  1371. }
  1372. epIP := "10.180.0.1"
  1373. makeServiceMap(fp,
  1374. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  1375. svc.Spec.Type = "LoadBalancer"
  1376. svc.Spec.ClusterIP = svcIP
  1377. svc.Spec.Ports = []v1.ServicePort{{
  1378. Name: svcPortName.Port,
  1379. Port: int32(svcPort),
  1380. Protocol: v1.ProtocolTCP,
  1381. }}
  1382. svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
  1383. IP: svcLBIP,
  1384. }}
  1385. svc.Spec.LoadBalancerSourceRanges = []string{
  1386. svcLBSource,
  1387. }
  1388. }),
  1389. )
  1390. makeEndpointsMap(fp,
  1391. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  1392. ept.Subsets = []v1.EndpointSubset{{
  1393. Addresses: []v1.EndpointAddress{{
  1394. IP: epIP,
  1395. NodeName: nil,
  1396. }},
  1397. Ports: []v1.EndpointPort{{
  1398. Name: svcPortName.Port,
  1399. Port: int32(svcPort),
  1400. Protocol: v1.ProtocolTCP,
  1401. }},
  1402. }}
  1403. }),
  1404. )
  1405. fp.syncProxyRules()
  1406. // Check ipvs service and destinations
  1407. epVS := &netlinktest.ExpectedVirtualServer{
  1408. VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(v1.ProtocolTCP),
  1409. RS: []netlinktest.ExpectedRealServer{{
  1410. IP: epIP, Port: uint16(svcPort),
  1411. }}}
  1412. checkIPVS(t, fp, epVS)
  1413. // Check ipset entry
  1414. epIPSet := netlinktest.ExpectedIPSet{
  1415. kubeLoadBalancerSet: {{
  1416. IP: svcLBIP,
  1417. Port: svcPort,
  1418. Protocol: strings.ToLower(string(v1.ProtocolTCP)),
  1419. SetType: utilipset.HashIPPort,
  1420. }},
  1421. kubeLoadbalancerFWSet: {{
  1422. IP: svcLBIP,
  1423. Port: svcPort,
  1424. Protocol: strings.ToLower(string(v1.ProtocolTCP)),
  1425. SetType: utilipset.HashIPPort,
  1426. }},
  1427. kubeLoadBalancerSourceCIDRSet: {{
  1428. IP: svcLBIP,
  1429. Port: svcPort,
  1430. Protocol: strings.ToLower(string(v1.ProtocolTCP)),
  1431. Net: svcLBSource,
  1432. SetType: utilipset.HashIPPortNet,
  1433. }},
  1434. }
  1435. checkIPSet(t, fp, epIPSet)
  1436. // Check iptables chain and rules
  1437. epIpt := netlinktest.ExpectedIptablesChain{
  1438. string(kubeServicesChain): {{
  1439. JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
  1440. }},
  1441. string(KubeLoadBalancerChain): {{
  1442. JumpChain: string(KubeFireWallChain), MatchSet: kubeLoadbalancerFWSet,
  1443. }, {
  1444. JumpChain: string(KubeMarkMasqChain), MatchSet: "",
  1445. }},
  1446. string(KubeFireWallChain): {{
  1447. JumpChain: "RETURN", MatchSet: kubeLoadBalancerSourceCIDRSet,
  1448. }, {
  1449. JumpChain: string(KubeMarkDropChain), MatchSet: "",
  1450. }},
  1451. }
  1452. checkIptables(t, ipt, epIpt)
  1453. }
  1454. func TestAcceptIPVSTraffic(t *testing.T) {
  1455. ipt, fp := buildFakeProxier()
  1456. ingressIP := "1.2.3.4"
  1457. externalIP := []string{"5.6.7.8"}
  1458. svcInfos := []struct {
  1459. svcType v1.ServiceType
  1460. svcIP string
  1461. svcName string
  1462. epIP string
  1463. }{
  1464. {v1.ServiceTypeClusterIP, "10.20.30.40", "svc1", "10.180.0.1"},
  1465. {v1.ServiceTypeLoadBalancer, "10.20.30.41", "svc2", "10.180.0.2"},
  1466. {v1.ServiceTypeNodePort, "10.20.30.42", "svc3", "10.180.0.3"},
  1467. }
  1468. for _, svcInfo := range svcInfos {
  1469. makeServiceMap(fp,
  1470. makeTestService("ns1", svcInfo.svcName, func(svc *v1.Service) {
  1471. svc.Spec.Type = svcInfo.svcType
  1472. svc.Spec.ClusterIP = svcInfo.svcIP
  1473. svc.Spec.Ports = []v1.ServicePort{{
  1474. Name: "p80",
  1475. Port: 80,
  1476. Protocol: v1.ProtocolTCP,
  1477. NodePort: 80,
  1478. }}
  1479. if svcInfo.svcType == v1.ServiceTypeLoadBalancer {
  1480. svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
  1481. IP: ingressIP,
  1482. }}
  1483. }
  1484. if svcInfo.svcType == v1.ServiceTypeClusterIP {
  1485. svc.Spec.ExternalIPs = externalIP
  1486. }
  1487. }),
  1488. )
  1489. makeEndpointsMap(fp,
  1490. makeTestEndpoints("ns1", "p80", func(ept *v1.Endpoints) {
  1491. ept.Subsets = []v1.EndpointSubset{{
  1492. Addresses: []v1.EndpointAddress{{
  1493. IP: svcInfo.epIP,
  1494. }},
  1495. Ports: []v1.EndpointPort{{
  1496. Name: "p80",
  1497. Port: 80,
  1498. }},
  1499. }}
  1500. }),
  1501. )
  1502. }
  1503. fp.syncProxyRules()
  1504. // Check iptables chain and rules
  1505. epIpt := netlinktest.ExpectedIptablesChain{
  1506. string(kubeServicesChain): {
  1507. {JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet},
  1508. {JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet},
  1509. {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet},
  1510. },
  1511. }
  1512. checkIptables(t, ipt, epIpt)
  1513. }
  1514. func TestOnlyLocalLoadBalancing(t *testing.T) {
  1515. ipt, fp := buildFakeProxier()
  1516. svcIP := "10.20.30.41"
  1517. svcPort := 80
  1518. svcNodePort := 3001
  1519. svcLBIP := "1.2.3.4"
  1520. svcPortName := proxy.ServicePortName{
  1521. NamespacedName: makeNSN("ns1", "svc1"),
  1522. Port: "p80",
  1523. }
  1524. makeServiceMap(fp,
  1525. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  1526. svc.Spec.Type = "LoadBalancer"
  1527. svc.Spec.ClusterIP = svcIP
  1528. svc.Spec.Ports = []v1.ServicePort{{
  1529. Name: svcPortName.Port,
  1530. Port: int32(svcPort),
  1531. Protocol: v1.ProtocolTCP,
  1532. NodePort: int32(svcNodePort),
  1533. }}
  1534. svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
  1535. IP: svcLBIP,
  1536. }}
  1537. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  1538. }),
  1539. )
  1540. epIP := "10.180.0.1"
  1541. epIP1 := "10.180.1.1"
  1542. thisHostname := testHostname
  1543. otherHostname := "other-hostname"
  1544. makeEndpointsMap(fp,
  1545. makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
  1546. ept.Subsets = []v1.EndpointSubset{
  1547. { // **local** endpoint address, should be added as RS
  1548. Addresses: []v1.EndpointAddress{{
  1549. IP: epIP,
  1550. NodeName: &thisHostname,
  1551. }},
  1552. Ports: []v1.EndpointPort{{
  1553. Name: svcPortName.Port,
  1554. Port: int32(svcPort),
  1555. Protocol: v1.ProtocolTCP,
  1556. }}},
  1557. { // **remote** endpoint address, should not be added as RS
  1558. Addresses: []v1.EndpointAddress{{
  1559. IP: epIP1,
  1560. NodeName: &otherHostname,
  1561. }},
  1562. Ports: []v1.EndpointPort{{
  1563. Name: svcPortName.Port,
  1564. Port: int32(svcPort),
  1565. Protocol: v1.ProtocolTCP,
  1566. }},
  1567. }}
  1568. }),
  1569. )
  1570. fp.syncProxyRules()
  1571. // Expect 2 services and 1 destination
  1572. epVS := &netlinktest.ExpectedVirtualServer{
  1573. VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(v1.ProtocolTCP),
  1574. RS: []netlinktest.ExpectedRealServer{{
  1575. IP: epIP, Port: uint16(svcPort),
  1576. }}}
  1577. checkIPVS(t, fp, epVS)
  1578. // check ipSet rules
  1579. epIPSet := netlinktest.ExpectedIPSet{
  1580. kubeLoadBalancerSet: {{
  1581. IP: svcLBIP,
  1582. Port: svcPort,
  1583. Protocol: strings.ToLower(string(v1.ProtocolTCP)),
  1584. SetType: utilipset.HashIPPort,
  1585. }},
  1586. kubeLoadBalancerLocalSet: {{
  1587. IP: svcLBIP,
  1588. Port: svcPort,
  1589. Protocol: strings.ToLower(string(v1.ProtocolTCP)),
  1590. SetType: utilipset.HashIPPort,
  1591. }},
  1592. }
  1593. checkIPSet(t, fp, epIPSet)
  1594. // Check iptables chain and rules
  1595. epIpt := netlinktest.ExpectedIptablesChain{
  1596. string(kubeServicesChain): {{
  1597. JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
  1598. }},
  1599. string(KubeLoadBalancerChain): {{
  1600. JumpChain: "RETURN", MatchSet: kubeLoadBalancerLocalSet,
  1601. }, {
  1602. JumpChain: string(KubeMarkMasqChain), MatchSet: "",
  1603. }},
  1604. }
  1605. checkIptables(t, ipt, epIpt)
  1606. }
  1607. func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port, nodeport int32, targetPort int) []v1.ServicePort {
  1608. svcPort := v1.ServicePort{
  1609. Name: name,
  1610. Protocol: protocol,
  1611. Port: port,
  1612. NodePort: nodeport,
  1613. TargetPort: intstr.FromInt(targetPort),
  1614. }
  1615. return append(array, svcPort)
  1616. }
  1617. func TestBuildServiceMapAddRemove(t *testing.T) {
  1618. ipt := iptablestest.NewFake()
  1619. ipvs := ipvstest.NewFake()
  1620. ipset := ipsettest.NewFake(testIPSetVersion)
  1621. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  1622. services := []*v1.Service{
  1623. makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
  1624. svc.Spec.Type = v1.ServiceTypeClusterIP
  1625. svc.Spec.ClusterIP = "172.16.55.4"
  1626. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
  1627. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
  1628. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somesctp", "SCTP", 1236, 6321, 0)
  1629. }),
  1630. makeTestService("somewhere-else", "node-port", func(svc *v1.Service) {
  1631. svc.Spec.Type = v1.ServiceTypeNodePort
  1632. svc.Spec.ClusterIP = "172.16.55.10"
  1633. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0)
  1634. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0)
  1635. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpblah", "SCTP", 343, 676, 0)
  1636. }),
  1637. makeTestService("somewhere", "load-balancer", func(svc *v1.Service) {
  1638. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1639. svc.Spec.ClusterIP = "172.16.55.11"
  1640. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1641. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
  1642. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
  1643. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpfoo", "SCTP", 8677, 30063, 7002)
  1644. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1645. Ingress: []v1.LoadBalancerIngress{
  1646. {IP: "10.1.2.4"},
  1647. },
  1648. }
  1649. }),
  1650. makeTestService("somewhere", "only-local-load-balancer", func(svc *v1.Service) {
  1651. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1652. svc.Spec.ClusterIP = "172.16.55.12"
  1653. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1654. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
  1655. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
  1656. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpbaz", "SCTP", 8679, 30065, 7004)
  1657. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1658. Ingress: []v1.LoadBalancerIngress{
  1659. {IP: "10.1.2.3"},
  1660. },
  1661. }
  1662. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  1663. svc.Spec.HealthCheckNodePort = 345
  1664. }),
  1665. }
  1666. for i := range services {
  1667. fp.OnServiceAdd(services[i])
  1668. }
  1669. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1670. if len(fp.serviceMap) != 12 {
  1671. t.Errorf("expected service map length 12, got %v", fp.serviceMap)
  1672. }
  1673. // The only-local-loadbalancer ones get added
  1674. if len(result.HCServiceNodePorts) != 1 {
  1675. t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
  1676. } else {
  1677. nsn := makeNSN("somewhere", "only-local-load-balancer")
  1678. if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
  1679. t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
  1680. }
  1681. }
  1682. if len(result.UDPStaleClusterIP) != 0 {
  1683. // Services only added, so nothing stale yet
  1684. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1685. }
  1686. // Remove some stuff
  1687. // oneService is a modification of services[0] with removed first port.
  1688. oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
  1689. svc.Spec.Type = v1.ServiceTypeClusterIP
  1690. svc.Spec.ClusterIP = "172.16.55.4"
  1691. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
  1692. })
  1693. fp.OnServiceUpdate(services[0], oneService)
  1694. fp.OnServiceDelete(services[1])
  1695. fp.OnServiceDelete(services[2])
  1696. fp.OnServiceDelete(services[3])
  1697. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1698. if len(fp.serviceMap) != 1 {
  1699. t.Errorf("expected service map length 1, got %v", fp.serviceMap)
  1700. }
  1701. if len(result.HCServiceNodePorts) != 0 {
  1702. t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
  1703. }
  1704. // All services but one were deleted. While you'd expect only the ClusterIPs
  1705. // from the three deleted services here, we still have the ClusterIP for
  1706. // the not-deleted service, because one of it's ServicePorts was deleted.
  1707. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
  1708. if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
  1709. t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.List())
  1710. }
  1711. for _, ip := range expectedStaleUDPServices {
  1712. if !result.UDPStaleClusterIP.Has(ip) {
  1713. t.Errorf("expected stale UDP service service %s", ip)
  1714. }
  1715. }
  1716. }
  1717. func TestBuildServiceMapServiceHeadless(t *testing.T) {
  1718. ipt := iptablestest.NewFake()
  1719. ipvs := ipvstest.NewFake()
  1720. ipset := ipsettest.NewFake(testIPSetVersion)
  1721. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  1722. makeServiceMap(fp,
  1723. makeTestService("somewhere-else", "headless", func(svc *v1.Service) {
  1724. svc.Spec.Type = v1.ServiceTypeClusterIP
  1725. svc.Spec.ClusterIP = v1.ClusterIPNone
  1726. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
  1727. }),
  1728. makeTestService("somewhere-else", "headless-without-port", func(svc *v1.Service) {
  1729. svc.Spec.Type = v1.ServiceTypeClusterIP
  1730. svc.Spec.ClusterIP = v1.ClusterIPNone
  1731. }),
  1732. makeTestService("somewhere-else", "headless-sctp", func(svc *v1.Service) {
  1733. svc.Spec.Type = v1.ServiceTypeClusterIP
  1734. svc.Spec.ClusterIP = v1.ClusterIPNone
  1735. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sip", "SCTP", 1235, 0, 0)
  1736. }),
  1737. )
  1738. // Headless service should be ignored
  1739. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1740. if len(fp.serviceMap) != 0 {
  1741. t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
  1742. }
  1743. // No proxied services, so no healthchecks
  1744. if len(result.HCServiceNodePorts) != 0 {
  1745. t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
  1746. }
  1747. if len(result.UDPStaleClusterIP) != 0 {
  1748. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1749. }
  1750. }
  1751. func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
  1752. ipt := iptablestest.NewFake()
  1753. ipvs := ipvstest.NewFake()
  1754. ipset := ipsettest.NewFake(testIPSetVersion)
  1755. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  1756. makeServiceMap(fp,
  1757. makeTestService("somewhere-else", "external-name", func(svc *v1.Service) {
  1758. svc.Spec.Type = v1.ServiceTypeExternalName
  1759. svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
  1760. svc.Spec.ExternalName = "foo2.bar.com"
  1761. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
  1762. }),
  1763. )
  1764. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1765. if len(fp.serviceMap) != 0 {
  1766. t.Errorf("expected service map length 0, got %v", fp.serviceMap)
  1767. }
  1768. // No proxied services, so no healthchecks
  1769. if len(result.HCServiceNodePorts) != 0 {
  1770. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1771. }
  1772. if len(result.UDPStaleClusterIP) != 0 {
  1773. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
  1774. }
  1775. }
  1776. func TestBuildServiceMapServiceUpdate(t *testing.T) {
  1777. ipt := iptablestest.NewFake()
  1778. ipvs := ipvstest.NewFake()
  1779. ipset := ipsettest.NewFake(testIPSetVersion)
  1780. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  1781. servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
  1782. svc.Spec.Type = v1.ServiceTypeClusterIP
  1783. svc.Spec.ClusterIP = "172.16.55.4"
  1784. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
  1785. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
  1786. })
  1787. servicev2 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
  1788. svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1789. svc.Spec.ClusterIP = "172.16.55.4"
  1790. svc.Spec.LoadBalancerIP = "5.6.7.8"
  1791. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
  1792. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
  1793. svc.Status.LoadBalancer = v1.LoadBalancerStatus{
  1794. Ingress: []v1.LoadBalancerIngress{
  1795. {IP: "10.1.2.3"},
  1796. },
  1797. }
  1798. svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
  1799. svc.Spec.HealthCheckNodePort = 345
  1800. })
  1801. fp.OnServiceAdd(servicev1)
  1802. result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1803. if len(fp.serviceMap) != 2 {
  1804. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1805. }
  1806. if len(result.HCServiceNodePorts) != 0 {
  1807. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1808. }
  1809. if len(result.UDPStaleClusterIP) != 0 {
  1810. // Services only added, so nothing stale yet
  1811. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1812. }
  1813. // Change service to load-balancer
  1814. fp.OnServiceUpdate(servicev1, servicev2)
  1815. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1816. if len(fp.serviceMap) != 2 {
  1817. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1818. }
  1819. if len(result.HCServiceNodePorts) != 1 {
  1820. t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
  1821. }
  1822. if len(result.UDPStaleClusterIP) != 0 {
  1823. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List())
  1824. }
  1825. // No change; make sure the service map stays the same and there are
  1826. // no health-check changes
  1827. fp.OnServiceUpdate(servicev2, servicev2)
  1828. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1829. if len(fp.serviceMap) != 2 {
  1830. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1831. }
  1832. if len(result.HCServiceNodePorts) != 1 {
  1833. t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
  1834. }
  1835. if len(result.UDPStaleClusterIP) != 0 {
  1836. t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List())
  1837. }
  1838. // And back to ClusterIP
  1839. fp.OnServiceUpdate(servicev2, servicev1)
  1840. result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
  1841. if len(fp.serviceMap) != 2 {
  1842. t.Errorf("expected service map length 2, got %v", fp.serviceMap)
  1843. }
  1844. if len(result.HCServiceNodePorts) != 0 {
  1845. t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
  1846. }
  1847. if len(result.UDPStaleClusterIP) != 0 {
  1848. // Services only added, so nothing stale yet
  1849. t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
  1850. }
  1851. }
  1852. func TestSessionAffinity(t *testing.T) {
  1853. ipt := iptablestest.NewFake()
  1854. ipvs := ipvstest.NewFake()
  1855. ipset := ipsettest.NewFake(testIPSetVersion)
  1856. nodeIP := net.ParseIP("100.101.102.103")
  1857. fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}, nil, false)
  1858. svcIP := "10.20.30.41"
  1859. svcPort := 80
  1860. svcNodePort := 3001
  1861. svcExternalIPs := "50.60.70.81"
  1862. svcPortName := proxy.ServicePortName{
  1863. NamespacedName: makeNSN("ns1", "svc1"),
  1864. Port: "p80",
  1865. }
  1866. timeoutSeconds := v1.DefaultClientIPServiceAffinitySeconds
  1867. makeServiceMap(fp,
  1868. makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
  1869. svc.Spec.Type = "NodePort"
  1870. svc.Spec.ClusterIP = svcIP
  1871. svc.Spec.ExternalIPs = []string{svcExternalIPs}
  1872. svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
  1873. svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
  1874. ClientIP: &v1.ClientIPConfig{
  1875. TimeoutSeconds: &timeoutSeconds,
  1876. },
  1877. }
  1878. svc.Spec.Ports = []v1.ServicePort{{
  1879. Name: svcPortName.Port,
  1880. Port: int32(svcPort),
  1881. Protocol: v1.ProtocolTCP,
  1882. NodePort: int32(svcNodePort),
  1883. }}
  1884. }),
  1885. )
  1886. makeEndpointsMap(fp)
  1887. fp.syncProxyRules()
  1888. // check ipvs service and destinations
  1889. services, err := ipvs.GetVirtualServers()
  1890. if err != nil {
  1891. t.Errorf("Failed to get ipvs services, err: %v", err)
  1892. }
  1893. for _, svc := range services {
  1894. if svc.Timeout != uint32(v1.DefaultClientIPServiceAffinitySeconds) {
  1895. t.Errorf("Unexpected mismatch ipvs service session affinity timeout: %d, expected: %d", svc.Timeout, v1.DefaultClientIPServiceAffinitySeconds)
  1896. }
  1897. }
  1898. }
  1899. func makeServicePortName(ns, name, port string, protocol v1.Protocol) proxy.ServicePortName {
  1900. return proxy.ServicePortName{
  1901. NamespacedName: makeNSN(ns, name),
  1902. Port: port,
  1903. Protocol: protocol,
  1904. }
  1905. }
  1906. func Test_updateEndpointsMap(t *testing.T) {
  1907. var nodeName = testHostname
  1908. emptyEndpoint := func(ept *v1.Endpoints) {
  1909. ept.Subsets = []v1.EndpointSubset{}
  1910. }
  1911. unnamedPort := func(ept *v1.Endpoints) {
  1912. ept.Subsets = []v1.EndpointSubset{{
  1913. Addresses: []v1.EndpointAddress{{
  1914. IP: "1.1.1.1",
  1915. }},
  1916. Ports: []v1.EndpointPort{{
  1917. Port: 11,
  1918. Protocol: v1.ProtocolUDP,
  1919. }},
  1920. }}
  1921. }
  1922. unnamedPortLocal := func(ept *v1.Endpoints) {
  1923. ept.Subsets = []v1.EndpointSubset{{
  1924. Addresses: []v1.EndpointAddress{{
  1925. IP: "1.1.1.1",
  1926. NodeName: &nodeName,
  1927. }},
  1928. Ports: []v1.EndpointPort{{
  1929. Port: 11,
  1930. Protocol: v1.ProtocolUDP,
  1931. }},
  1932. }}
  1933. }
  1934. namedPortLocal := func(ept *v1.Endpoints) {
  1935. ept.Subsets = []v1.EndpointSubset{{
  1936. Addresses: []v1.EndpointAddress{{
  1937. IP: "1.1.1.1",
  1938. NodeName: &nodeName,
  1939. }},
  1940. Ports: []v1.EndpointPort{{
  1941. Name: "p11",
  1942. Port: 11,
  1943. Protocol: v1.ProtocolUDP,
  1944. }},
  1945. }}
  1946. }
  1947. namedPort := func(ept *v1.Endpoints) {
  1948. ept.Subsets = []v1.EndpointSubset{{
  1949. Addresses: []v1.EndpointAddress{{
  1950. IP: "1.1.1.1",
  1951. }},
  1952. Ports: []v1.EndpointPort{{
  1953. Name: "p11",
  1954. Port: 11,
  1955. Protocol: v1.ProtocolUDP,
  1956. }},
  1957. }}
  1958. }
  1959. namedPortRenamed := func(ept *v1.Endpoints) {
  1960. ept.Subsets = []v1.EndpointSubset{{
  1961. Addresses: []v1.EndpointAddress{{
  1962. IP: "1.1.1.1",
  1963. }},
  1964. Ports: []v1.EndpointPort{{
  1965. Name: "p11-2",
  1966. Port: 11,
  1967. Protocol: v1.ProtocolUDP,
  1968. }},
  1969. }}
  1970. }
  1971. namedPortRenumbered := func(ept *v1.Endpoints) {
  1972. ept.Subsets = []v1.EndpointSubset{{
  1973. Addresses: []v1.EndpointAddress{{
  1974. IP: "1.1.1.1",
  1975. }},
  1976. Ports: []v1.EndpointPort{{
  1977. Name: "p11",
  1978. Port: 22,
  1979. Protocol: v1.ProtocolUDP,
  1980. }},
  1981. }}
  1982. }
  1983. namedPortsLocalNoLocal := func(ept *v1.Endpoints) {
  1984. ept.Subsets = []v1.EndpointSubset{{
  1985. Addresses: []v1.EndpointAddress{{
  1986. IP: "1.1.1.1",
  1987. }, {
  1988. IP: "1.1.1.2",
  1989. NodeName: &nodeName,
  1990. }},
  1991. Ports: []v1.EndpointPort{{
  1992. Name: "p11",
  1993. Port: 11,
  1994. Protocol: v1.ProtocolUDP,
  1995. }, {
  1996. Name: "p12",
  1997. Port: 12,
  1998. Protocol: v1.ProtocolUDP,
  1999. }},
  2000. }}
  2001. }
  2002. multipleSubsets := func(ept *v1.Endpoints) {
  2003. ept.Subsets = []v1.EndpointSubset{{
  2004. Addresses: []v1.EndpointAddress{{
  2005. IP: "1.1.1.1",
  2006. }},
  2007. Ports: []v1.EndpointPort{{
  2008. Name: "p11",
  2009. Port: 11,
  2010. Protocol: v1.ProtocolUDP,
  2011. }},
  2012. }, {
  2013. Addresses: []v1.EndpointAddress{{
  2014. IP: "1.1.1.2",
  2015. }},
  2016. Ports: []v1.EndpointPort{{
  2017. Name: "p12",
  2018. Port: 12,
  2019. Protocol: v1.ProtocolUDP,
  2020. }},
  2021. }}
  2022. }
  2023. multipleSubsetsWithLocal := func(ept *v1.Endpoints) {
  2024. ept.Subsets = []v1.EndpointSubset{{
  2025. Addresses: []v1.EndpointAddress{{
  2026. IP: "1.1.1.1",
  2027. }},
  2028. Ports: []v1.EndpointPort{{
  2029. Name: "p11",
  2030. Port: 11,
  2031. Protocol: v1.ProtocolUDP,
  2032. }},
  2033. }, {
  2034. Addresses: []v1.EndpointAddress{{
  2035. IP: "1.1.1.2",
  2036. NodeName: &nodeName,
  2037. }},
  2038. Ports: []v1.EndpointPort{{
  2039. Name: "p12",
  2040. Port: 12,
  2041. Protocol: v1.ProtocolUDP,
  2042. }},
  2043. }}
  2044. }
  2045. multipleSubsetsMultiplePortsLocal := func(ept *v1.Endpoints) {
  2046. ept.Subsets = []v1.EndpointSubset{{
  2047. Addresses: []v1.EndpointAddress{{
  2048. IP: "1.1.1.1",
  2049. NodeName: &nodeName,
  2050. }},
  2051. Ports: []v1.EndpointPort{{
  2052. Name: "p11",
  2053. Port: 11,
  2054. Protocol: v1.ProtocolUDP,
  2055. }, {
  2056. Name: "p12",
  2057. Port: 12,
  2058. Protocol: v1.ProtocolUDP,
  2059. }},
  2060. }, {
  2061. Addresses: []v1.EndpointAddress{{
  2062. IP: "1.1.1.3",
  2063. }},
  2064. Ports: []v1.EndpointPort{{
  2065. Name: "p13",
  2066. Port: 13,
  2067. Protocol: v1.ProtocolUDP,
  2068. }},
  2069. }}
  2070. }
  2071. multipleSubsetsIPsPorts1 := func(ept *v1.Endpoints) {
  2072. ept.Subsets = []v1.EndpointSubset{{
  2073. Addresses: []v1.EndpointAddress{{
  2074. IP: "1.1.1.1",
  2075. }, {
  2076. IP: "1.1.1.2",
  2077. NodeName: &nodeName,
  2078. }},
  2079. Ports: []v1.EndpointPort{{
  2080. Name: "p11",
  2081. Port: 11,
  2082. Protocol: v1.ProtocolUDP,
  2083. }, {
  2084. Name: "p12",
  2085. Port: 12,
  2086. Protocol: v1.ProtocolUDP,
  2087. }},
  2088. }, {
  2089. Addresses: []v1.EndpointAddress{{
  2090. IP: "1.1.1.3",
  2091. }, {
  2092. IP: "1.1.1.4",
  2093. NodeName: &nodeName,
  2094. }},
  2095. Ports: []v1.EndpointPort{{
  2096. Name: "p13",
  2097. Port: 13,
  2098. Protocol: v1.ProtocolUDP,
  2099. }, {
  2100. Name: "p14",
  2101. Port: 14,
  2102. Protocol: v1.ProtocolUDP,
  2103. }},
  2104. }}
  2105. }
  2106. multipleSubsetsIPsPorts2 := func(ept *v1.Endpoints) {
  2107. ept.Subsets = []v1.EndpointSubset{{
  2108. Addresses: []v1.EndpointAddress{{
  2109. IP: "2.2.2.1",
  2110. }, {
  2111. IP: "2.2.2.2",
  2112. NodeName: &nodeName,
  2113. }},
  2114. Ports: []v1.EndpointPort{{
  2115. Name: "p21",
  2116. Port: 21,
  2117. Protocol: v1.ProtocolUDP,
  2118. }, {
  2119. Name: "p22",
  2120. Port: 22,
  2121. Protocol: v1.ProtocolUDP,
  2122. }},
  2123. }}
  2124. }
  2125. complexBefore1 := func(ept *v1.Endpoints) {
  2126. ept.Subsets = []v1.EndpointSubset{{
  2127. Addresses: []v1.EndpointAddress{{
  2128. IP: "1.1.1.1",
  2129. }},
  2130. Ports: []v1.EndpointPort{{
  2131. Name: "p11",
  2132. Port: 11,
  2133. Protocol: v1.ProtocolUDP,
  2134. }},
  2135. }}
  2136. }
  2137. complexBefore2 := func(ept *v1.Endpoints) {
  2138. ept.Subsets = []v1.EndpointSubset{{
  2139. Addresses: []v1.EndpointAddress{{
  2140. IP: "2.2.2.2",
  2141. NodeName: &nodeName,
  2142. }, {
  2143. IP: "2.2.2.22",
  2144. NodeName: &nodeName,
  2145. }},
  2146. Ports: []v1.EndpointPort{{
  2147. Name: "p22",
  2148. Port: 22,
  2149. Protocol: v1.ProtocolUDP,
  2150. }},
  2151. }, {
  2152. Addresses: []v1.EndpointAddress{{
  2153. IP: "2.2.2.3",
  2154. NodeName: &nodeName,
  2155. }},
  2156. Ports: []v1.EndpointPort{{
  2157. Name: "p23",
  2158. Port: 23,
  2159. Protocol: v1.ProtocolUDP,
  2160. }},
  2161. }}
  2162. }
  2163. complexBefore4 := func(ept *v1.Endpoints) {
  2164. ept.Subsets = []v1.EndpointSubset{{
  2165. Addresses: []v1.EndpointAddress{{
  2166. IP: "4.4.4.4",
  2167. NodeName: &nodeName,
  2168. }, {
  2169. IP: "4.4.4.5",
  2170. NodeName: &nodeName,
  2171. }},
  2172. Ports: []v1.EndpointPort{{
  2173. Name: "p44",
  2174. Port: 44,
  2175. Protocol: v1.ProtocolUDP,
  2176. }},
  2177. }, {
  2178. Addresses: []v1.EndpointAddress{{
  2179. IP: "4.4.4.6",
  2180. NodeName: &nodeName,
  2181. }},
  2182. Ports: []v1.EndpointPort{{
  2183. Name: "p45",
  2184. Port: 45,
  2185. Protocol: v1.ProtocolUDP,
  2186. }},
  2187. }}
  2188. }
  2189. complexAfter1 := func(ept *v1.Endpoints) {
  2190. ept.Subsets = []v1.EndpointSubset{{
  2191. Addresses: []v1.EndpointAddress{{
  2192. IP: "1.1.1.1",
  2193. }, {
  2194. IP: "1.1.1.11",
  2195. }},
  2196. Ports: []v1.EndpointPort{{
  2197. Name: "p11",
  2198. Port: 11,
  2199. Protocol: v1.ProtocolUDP,
  2200. }},
  2201. }, {
  2202. Addresses: []v1.EndpointAddress{{
  2203. IP: "1.1.1.2",
  2204. }},
  2205. Ports: []v1.EndpointPort{{
  2206. Name: "p12",
  2207. Port: 12,
  2208. Protocol: v1.ProtocolUDP,
  2209. }, {
  2210. Name: "p122",
  2211. Port: 122,
  2212. Protocol: v1.ProtocolUDP,
  2213. }},
  2214. }}
  2215. }
  2216. complexAfter3 := func(ept *v1.Endpoints) {
  2217. ept.Subsets = []v1.EndpointSubset{{
  2218. Addresses: []v1.EndpointAddress{{
  2219. IP: "3.3.3.3",
  2220. }},
  2221. Ports: []v1.EndpointPort{{
  2222. Name: "p33",
  2223. Port: 33,
  2224. Protocol: v1.ProtocolUDP,
  2225. }},
  2226. }}
  2227. }
  2228. complexAfter4 := func(ept *v1.Endpoints) {
  2229. ept.Subsets = []v1.EndpointSubset{{
  2230. Addresses: []v1.EndpointAddress{{
  2231. IP: "4.4.4.4",
  2232. NodeName: &nodeName,
  2233. }},
  2234. Ports: []v1.EndpointPort{{
  2235. Name: "p44",
  2236. Port: 44,
  2237. Protocol: v1.ProtocolUDP,
  2238. }},
  2239. }}
  2240. }
  2241. testCases := []struct {
  2242. // previousEndpoints and currentEndpoints are used to call appropriate
  2243. // handlers OnEndpoints* (based on whether corresponding values are nil
  2244. // or non-nil) and must be of equal length.
  2245. previousEndpoints []*v1.Endpoints
  2246. currentEndpoints []*v1.Endpoints
  2247. oldEndpoints map[proxy.ServicePortName][]*proxy.BaseEndpointInfo
  2248. expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo
  2249. expectedStaleEndpoints []proxy.ServiceEndpoint
  2250. expectedStaleServiceNames map[proxy.ServicePortName]bool
  2251. expectedHealthchecks map[types.NamespacedName]int
  2252. }{{
  2253. // Case[0]: nothing
  2254. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
  2255. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
  2256. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2257. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2258. expectedHealthchecks: map[types.NamespacedName]int{},
  2259. }, {
  2260. // Case[1]: no change, unnamed port
  2261. previousEndpoints: []*v1.Endpoints{
  2262. makeTestEndpoints("ns1", "ep1", unnamedPort),
  2263. },
  2264. currentEndpoints: []*v1.Endpoints{
  2265. makeTestEndpoints("ns1", "ep1", unnamedPort),
  2266. },
  2267. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2268. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  2269. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2270. },
  2271. },
  2272. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2273. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  2274. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2275. },
  2276. },
  2277. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2278. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2279. expectedHealthchecks: map[types.NamespacedName]int{},
  2280. }, {
  2281. // Case[2]: no change, named port, local
  2282. previousEndpoints: []*v1.Endpoints{
  2283. makeTestEndpoints("ns1", "ep1", namedPortLocal),
  2284. },
  2285. currentEndpoints: []*v1.Endpoints{
  2286. makeTestEndpoints("ns1", "ep1", namedPortLocal),
  2287. },
  2288. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2289. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2290. {Endpoint: "1.1.1.1:11", IsLocal: true},
  2291. },
  2292. },
  2293. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2294. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2295. {Endpoint: "1.1.1.1:11", IsLocal: true},
  2296. },
  2297. },
  2298. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2299. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2300. expectedHealthchecks: map[types.NamespacedName]int{
  2301. makeNSN("ns1", "ep1"): 1,
  2302. },
  2303. }, {
  2304. // Case[3]: no change, multiple subsets
  2305. previousEndpoints: []*v1.Endpoints{
  2306. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  2307. },
  2308. currentEndpoints: []*v1.Endpoints{
  2309. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  2310. },
  2311. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2312. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2313. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2314. },
  2315. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2316. {Endpoint: "1.1.1.2:12", IsLocal: false},
  2317. },
  2318. },
  2319. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2320. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2321. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2322. },
  2323. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2324. {Endpoint: "1.1.1.2:12", IsLocal: false},
  2325. },
  2326. },
  2327. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2328. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2329. expectedHealthchecks: map[types.NamespacedName]int{},
  2330. }, {
  2331. // Case[4]: no change, multiple subsets, multiple ports, local
  2332. previousEndpoints: []*v1.Endpoints{
  2333. makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
  2334. },
  2335. currentEndpoints: []*v1.Endpoints{
  2336. makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
  2337. },
  2338. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2339. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2340. {Endpoint: "1.1.1.1:11", IsLocal: true},
  2341. },
  2342. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2343. {Endpoint: "1.1.1.1:12", IsLocal: true},
  2344. },
  2345. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  2346. {Endpoint: "1.1.1.3:13", IsLocal: false},
  2347. },
  2348. },
  2349. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2350. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2351. {Endpoint: "1.1.1.1:11", IsLocal: true},
  2352. },
  2353. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2354. {Endpoint: "1.1.1.1:12", IsLocal: true},
  2355. },
  2356. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  2357. {Endpoint: "1.1.1.3:13", IsLocal: false},
  2358. },
  2359. },
  2360. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2361. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2362. expectedHealthchecks: map[types.NamespacedName]int{
  2363. makeNSN("ns1", "ep1"): 1,
  2364. },
  2365. }, {
  2366. // Case[5]: no change, multiple endpoints, subsets, IPs, and ports
  2367. previousEndpoints: []*v1.Endpoints{
  2368. makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
  2369. makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
  2370. },
  2371. currentEndpoints: []*v1.Endpoints{
  2372. makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
  2373. makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
  2374. },
  2375. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2376. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2377. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2378. {Endpoint: "1.1.1.2:11", IsLocal: true},
  2379. },
  2380. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2381. {Endpoint: "1.1.1.1:12", IsLocal: false},
  2382. {Endpoint: "1.1.1.2:12", IsLocal: true},
  2383. },
  2384. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  2385. {Endpoint: "1.1.1.3:13", IsLocal: false},
  2386. {Endpoint: "1.1.1.4:13", IsLocal: true},
  2387. },
  2388. makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): {
  2389. {Endpoint: "1.1.1.3:14", IsLocal: false},
  2390. {Endpoint: "1.1.1.4:14", IsLocal: true},
  2391. },
  2392. makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): {
  2393. {Endpoint: "2.2.2.1:21", IsLocal: false},
  2394. {Endpoint: "2.2.2.2:21", IsLocal: true},
  2395. },
  2396. makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
  2397. {Endpoint: "2.2.2.1:22", IsLocal: false},
  2398. {Endpoint: "2.2.2.2:22", IsLocal: true},
  2399. },
  2400. },
  2401. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2402. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2403. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2404. {Endpoint: "1.1.1.2:11", IsLocal: true},
  2405. },
  2406. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2407. {Endpoint: "1.1.1.1:12", IsLocal: false},
  2408. {Endpoint: "1.1.1.2:12", IsLocal: true},
  2409. },
  2410. makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): {
  2411. {Endpoint: "1.1.1.3:13", IsLocal: false},
  2412. {Endpoint: "1.1.1.4:13", IsLocal: true},
  2413. },
  2414. makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): {
  2415. {Endpoint: "1.1.1.3:14", IsLocal: false},
  2416. {Endpoint: "1.1.1.4:14", IsLocal: true},
  2417. },
  2418. makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): {
  2419. {Endpoint: "2.2.2.1:21", IsLocal: false},
  2420. {Endpoint: "2.2.2.2:21", IsLocal: true},
  2421. },
  2422. makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
  2423. {Endpoint: "2.2.2.1:22", IsLocal: false},
  2424. {Endpoint: "2.2.2.2:22", IsLocal: true},
  2425. },
  2426. },
  2427. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2428. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2429. expectedHealthchecks: map[types.NamespacedName]int{
  2430. makeNSN("ns1", "ep1"): 2,
  2431. makeNSN("ns2", "ep2"): 1,
  2432. },
  2433. }, {
  2434. // Case[6]: add an Endpoints
  2435. previousEndpoints: []*v1.Endpoints{
  2436. nil,
  2437. },
  2438. currentEndpoints: []*v1.Endpoints{
  2439. makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
  2440. },
  2441. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
  2442. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2443. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  2444. {Endpoint: "1.1.1.1:11", IsLocal: true},
  2445. },
  2446. },
  2447. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2448. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2449. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
  2450. },
  2451. expectedHealthchecks: map[types.NamespacedName]int{
  2452. makeNSN("ns1", "ep1"): 1,
  2453. },
  2454. }, {
  2455. // Case[7]: remove an Endpoints
  2456. previousEndpoints: []*v1.Endpoints{
  2457. makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
  2458. },
  2459. currentEndpoints: []*v1.Endpoints{
  2460. nil,
  2461. },
  2462. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2463. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  2464. {Endpoint: "1.1.1.1:11", IsLocal: true},
  2465. },
  2466. },
  2467. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
  2468. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2469. Endpoint: "1.1.1.1:11",
  2470. ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
  2471. }},
  2472. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2473. expectedHealthchecks: map[types.NamespacedName]int{},
  2474. }, {
  2475. // Case[8]: add an IP and port
  2476. previousEndpoints: []*v1.Endpoints{
  2477. makeTestEndpoints("ns1", "ep1", namedPort),
  2478. },
  2479. currentEndpoints: []*v1.Endpoints{
  2480. makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
  2481. },
  2482. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2483. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2484. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2485. },
  2486. },
  2487. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2488. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2489. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2490. {Endpoint: "1.1.1.2:11", IsLocal: true},
  2491. },
  2492. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2493. {Endpoint: "1.1.1.1:12", IsLocal: false},
  2494. {Endpoint: "1.1.1.2:12", IsLocal: true},
  2495. },
  2496. },
  2497. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2498. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2499. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
  2500. },
  2501. expectedHealthchecks: map[types.NamespacedName]int{
  2502. makeNSN("ns1", "ep1"): 1,
  2503. },
  2504. }, {
  2505. // Case[9]: remove an IP and port
  2506. previousEndpoints: []*v1.Endpoints{
  2507. makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
  2508. },
  2509. currentEndpoints: []*v1.Endpoints{
  2510. makeTestEndpoints("ns1", "ep1", namedPort),
  2511. },
  2512. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2513. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2514. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2515. {Endpoint: "1.1.1.2:11", IsLocal: true},
  2516. },
  2517. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2518. {Endpoint: "1.1.1.1:12", IsLocal: false},
  2519. {Endpoint: "1.1.1.2:12", IsLocal: true},
  2520. },
  2521. },
  2522. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2523. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2524. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2525. },
  2526. },
  2527. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2528. Endpoint: "1.1.1.2:11",
  2529. ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
  2530. }, {
  2531. Endpoint: "1.1.1.1:12",
  2532. ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
  2533. }, {
  2534. Endpoint: "1.1.1.2:12",
  2535. ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
  2536. }},
  2537. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2538. expectedHealthchecks: map[types.NamespacedName]int{},
  2539. }, {
  2540. // Case[10]: add a subset
  2541. previousEndpoints: []*v1.Endpoints{
  2542. makeTestEndpoints("ns1", "ep1", namedPort),
  2543. },
  2544. currentEndpoints: []*v1.Endpoints{
  2545. makeTestEndpoints("ns1", "ep1", multipleSubsetsWithLocal),
  2546. },
  2547. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2548. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2549. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2550. },
  2551. },
  2552. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2553. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2554. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2555. },
  2556. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2557. {Endpoint: "1.1.1.2:12", IsLocal: true},
  2558. },
  2559. },
  2560. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2561. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2562. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
  2563. },
  2564. expectedHealthchecks: map[types.NamespacedName]int{
  2565. makeNSN("ns1", "ep1"): 1,
  2566. },
  2567. }, {
  2568. // Case[11]: remove a subset
  2569. previousEndpoints: []*v1.Endpoints{
  2570. makeTestEndpoints("ns1", "ep1", multipleSubsets),
  2571. },
  2572. currentEndpoints: []*v1.Endpoints{
  2573. makeTestEndpoints("ns1", "ep1", namedPort),
  2574. },
  2575. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2576. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2577. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2578. },
  2579. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2580. {Endpoint: "1.1.1.2:12", IsLocal: false},
  2581. },
  2582. },
  2583. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2584. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2585. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2586. },
  2587. },
  2588. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2589. Endpoint: "1.1.1.2:12",
  2590. ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
  2591. }},
  2592. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2593. expectedHealthchecks: map[types.NamespacedName]int{},
  2594. }, {
  2595. // Case[12]: rename a port
  2596. previousEndpoints: []*v1.Endpoints{
  2597. makeTestEndpoints("ns1", "ep1", namedPort),
  2598. },
  2599. currentEndpoints: []*v1.Endpoints{
  2600. makeTestEndpoints("ns1", "ep1", namedPortRenamed),
  2601. },
  2602. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2603. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2604. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2605. },
  2606. },
  2607. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2608. makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): {
  2609. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2610. },
  2611. },
  2612. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2613. Endpoint: "1.1.1.1:11",
  2614. ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
  2615. }},
  2616. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2617. makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
  2618. },
  2619. expectedHealthchecks: map[types.NamespacedName]int{},
  2620. }, {
  2621. // Case[13]: renumber a port
  2622. previousEndpoints: []*v1.Endpoints{
  2623. makeTestEndpoints("ns1", "ep1", namedPort),
  2624. },
  2625. currentEndpoints: []*v1.Endpoints{
  2626. makeTestEndpoints("ns1", "ep1", namedPortRenumbered),
  2627. },
  2628. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2629. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2630. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2631. },
  2632. },
  2633. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2634. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2635. {Endpoint: "1.1.1.1:22", IsLocal: false},
  2636. },
  2637. },
  2638. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2639. Endpoint: "1.1.1.1:11",
  2640. ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
  2641. }},
  2642. expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
  2643. expectedHealthchecks: map[types.NamespacedName]int{},
  2644. }, {
  2645. // Case[14]: complex add and remove
  2646. previousEndpoints: []*v1.Endpoints{
  2647. makeTestEndpoints("ns1", "ep1", complexBefore1),
  2648. makeTestEndpoints("ns2", "ep2", complexBefore2),
  2649. nil,
  2650. makeTestEndpoints("ns4", "ep4", complexBefore4),
  2651. },
  2652. currentEndpoints: []*v1.Endpoints{
  2653. makeTestEndpoints("ns1", "ep1", complexAfter1),
  2654. nil,
  2655. makeTestEndpoints("ns3", "ep3", complexAfter3),
  2656. makeTestEndpoints("ns4", "ep4", complexAfter4),
  2657. },
  2658. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2659. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2660. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2661. },
  2662. makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): {
  2663. {Endpoint: "2.2.2.2:22", IsLocal: true},
  2664. {Endpoint: "2.2.2.22:22", IsLocal: true},
  2665. },
  2666. makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP): {
  2667. {Endpoint: "2.2.2.3:23", IsLocal: true},
  2668. },
  2669. makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): {
  2670. {Endpoint: "4.4.4.4:44", IsLocal: true},
  2671. {Endpoint: "4.4.4.5:44", IsLocal: true},
  2672. },
  2673. makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP): {
  2674. {Endpoint: "4.4.4.6:45", IsLocal: true},
  2675. },
  2676. },
  2677. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2678. makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
  2679. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2680. {Endpoint: "1.1.1.11:11", IsLocal: false},
  2681. },
  2682. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): {
  2683. {Endpoint: "1.1.1.2:12", IsLocal: false},
  2684. },
  2685. makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): {
  2686. {Endpoint: "1.1.1.2:122", IsLocal: false},
  2687. },
  2688. makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): {
  2689. {Endpoint: "3.3.3.3:33", IsLocal: false},
  2690. },
  2691. makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): {
  2692. {Endpoint: "4.4.4.4:44", IsLocal: true},
  2693. },
  2694. },
  2695. expectedStaleEndpoints: []proxy.ServiceEndpoint{{
  2696. Endpoint: "2.2.2.2:22",
  2697. ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
  2698. }, {
  2699. Endpoint: "2.2.2.22:22",
  2700. ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
  2701. }, {
  2702. Endpoint: "2.2.2.3:23",
  2703. ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP),
  2704. }, {
  2705. Endpoint: "4.4.4.5:44",
  2706. ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP),
  2707. }, {
  2708. Endpoint: "4.4.4.6:45",
  2709. ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
  2710. }},
  2711. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2712. makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
  2713. makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
  2714. makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
  2715. },
  2716. expectedHealthchecks: map[types.NamespacedName]int{
  2717. makeNSN("ns4", "ep4"): 1,
  2718. },
  2719. }, {
  2720. // Case[15]: change from 0 endpoint address to 1 unnamed port
  2721. previousEndpoints: []*v1.Endpoints{
  2722. makeTestEndpoints("ns1", "ep1", emptyEndpoint),
  2723. },
  2724. currentEndpoints: []*v1.Endpoints{
  2725. makeTestEndpoints("ns1", "ep1", unnamedPort),
  2726. },
  2727. oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
  2728. expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{
  2729. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
  2730. {Endpoint: "1.1.1.1:11", IsLocal: false},
  2731. },
  2732. },
  2733. expectedStaleEndpoints: []proxy.ServiceEndpoint{},
  2734. expectedStaleServiceNames: map[proxy.ServicePortName]bool{
  2735. makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
  2736. },
  2737. expectedHealthchecks: map[types.NamespacedName]int{},
  2738. },
  2739. }
  2740. for tci, tc := range testCases {
  2741. ipt := iptablestest.NewFake()
  2742. ipvs := ipvstest.NewFake()
  2743. ipset := ipsettest.NewFake(testIPSetVersion)
  2744. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  2745. fp.hostname = nodeName
  2746. // First check that after adding all previous versions of endpoints,
  2747. // the fp.oldEndpoints is as we expect.
  2748. for i := range tc.previousEndpoints {
  2749. if tc.previousEndpoints[i] != nil {
  2750. fp.OnEndpointsAdd(tc.previousEndpoints[i])
  2751. }
  2752. }
  2753. fp.endpointsMap.Update(fp.endpointsChanges)
  2754. compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
  2755. // Now let's call appropriate handlers to get to state we want to be.
  2756. if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
  2757. t.Fatalf("[%d] different lengths of previous and current endpoints", tci)
  2758. continue
  2759. }
  2760. for i := range tc.previousEndpoints {
  2761. prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i]
  2762. switch {
  2763. case prev == nil:
  2764. fp.OnEndpointsAdd(curr)
  2765. case curr == nil:
  2766. fp.OnEndpointsDelete(prev)
  2767. default:
  2768. fp.OnEndpointsUpdate(prev, curr)
  2769. }
  2770. }
  2771. result := fp.endpointsMap.Update(fp.endpointsChanges)
  2772. newMap := fp.endpointsMap
  2773. compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
  2774. if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
  2775. t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
  2776. }
  2777. for _, x := range tc.expectedStaleEndpoints {
  2778. found := false
  2779. for _, stale := range result.StaleEndpoints {
  2780. if stale == x {
  2781. found = true
  2782. break
  2783. }
  2784. }
  2785. if !found {
  2786. t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints)
  2787. }
  2788. }
  2789. if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) {
  2790. t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames)
  2791. }
  2792. for svcName := range tc.expectedStaleServiceNames {
  2793. found := false
  2794. for _, stale := range result.StaleServiceNames {
  2795. if stale == svcName {
  2796. found = true
  2797. break
  2798. }
  2799. }
  2800. if !found {
  2801. t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
  2802. }
  2803. }
  2804. if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
  2805. t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
  2806. }
  2807. }
  2808. }
  2809. func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*proxy.BaseEndpointInfo) {
  2810. if len(newMap) != len(expected) {
  2811. t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
  2812. }
  2813. for x := range expected {
  2814. if len(newMap[x]) != len(expected[x]) {
  2815. t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
  2816. } else {
  2817. for i := range expected[x] {
  2818. newEp, ok := newMap[x][i].(*proxy.BaseEndpointInfo)
  2819. if !ok {
  2820. t.Errorf("Failed to cast proxy.BaseEndpointInfo")
  2821. continue
  2822. }
  2823. if !reflect.DeepEqual(*newEp, *(expected[x][i])) {
  2824. t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
  2825. }
  2826. }
  2827. }
  2828. }
  2829. }
  2830. func Test_syncService(t *testing.T) {
  2831. testCases := []struct {
  2832. oldVirtualServer *utilipvs.VirtualServer
  2833. svcName string
  2834. newVirtualServer *utilipvs.VirtualServer
  2835. bindAddr bool
  2836. }{
  2837. {
  2838. // case 0, old virtual server is same as new virtual server
  2839. oldVirtualServer: &utilipvs.VirtualServer{
  2840. Address: net.ParseIP("1.2.3.4"),
  2841. Protocol: string(v1.ProtocolTCP),
  2842. Port: 80,
  2843. Scheduler: "rr",
  2844. Flags: utilipvs.FlagHashed,
  2845. },
  2846. svcName: "foo",
  2847. newVirtualServer: &utilipvs.VirtualServer{
  2848. Address: net.ParseIP("1.2.3.4"),
  2849. Protocol: string(v1.ProtocolTCP),
  2850. Port: 80,
  2851. Scheduler: "rr",
  2852. Flags: utilipvs.FlagHashed,
  2853. },
  2854. bindAddr: false,
  2855. },
  2856. {
  2857. // case 1, old virtual server is different from new virtual server
  2858. oldVirtualServer: &utilipvs.VirtualServer{
  2859. Address: net.ParseIP("1.2.3.4"),
  2860. Protocol: string(v1.ProtocolTCP),
  2861. Port: 8080,
  2862. Scheduler: "rr",
  2863. Flags: utilipvs.FlagHashed,
  2864. },
  2865. svcName: "bar",
  2866. newVirtualServer: &utilipvs.VirtualServer{
  2867. Address: net.ParseIP("1.2.3.4"),
  2868. Protocol: string(v1.ProtocolTCP),
  2869. Port: 8080,
  2870. Scheduler: "rr",
  2871. Flags: utilipvs.FlagPersistent,
  2872. },
  2873. bindAddr: false,
  2874. },
  2875. {
  2876. // case 2, old virtual server is different from new virtual server
  2877. oldVirtualServer: &utilipvs.VirtualServer{
  2878. Address: net.ParseIP("1.2.3.4"),
  2879. Protocol: string(v1.ProtocolTCP),
  2880. Port: 8080,
  2881. Scheduler: "rr",
  2882. Flags: utilipvs.FlagHashed,
  2883. },
  2884. svcName: "bar",
  2885. newVirtualServer: &utilipvs.VirtualServer{
  2886. Address: net.ParseIP("1.2.3.4"),
  2887. Protocol: string(v1.ProtocolTCP),
  2888. Port: 8080,
  2889. Scheduler: "wlc",
  2890. Flags: utilipvs.FlagHashed,
  2891. },
  2892. bindAddr: false,
  2893. },
  2894. {
  2895. // case 3, old virtual server is nil, and create new virtual server
  2896. oldVirtualServer: nil,
  2897. svcName: "baz",
  2898. newVirtualServer: &utilipvs.VirtualServer{
  2899. Address: net.ParseIP("1.2.3.4"),
  2900. Protocol: string(v1.ProtocolUDP),
  2901. Port: 53,
  2902. Scheduler: "rr",
  2903. Flags: utilipvs.FlagHashed,
  2904. },
  2905. bindAddr: true,
  2906. },
  2907. {
  2908. // case 4, SCTP, old virtual server is same as new virtual server
  2909. oldVirtualServer: &utilipvs.VirtualServer{
  2910. Address: net.ParseIP("1.2.3.4"),
  2911. Protocol: string(v1.ProtocolSCTP),
  2912. Port: 80,
  2913. Scheduler: "rr",
  2914. Flags: utilipvs.FlagHashed,
  2915. },
  2916. svcName: "foo",
  2917. newVirtualServer: &utilipvs.VirtualServer{
  2918. Address: net.ParseIP("1.2.3.4"),
  2919. Protocol: string(v1.ProtocolSCTP),
  2920. Port: 80,
  2921. Scheduler: "rr",
  2922. Flags: utilipvs.FlagHashed,
  2923. },
  2924. bindAddr: false,
  2925. },
  2926. {
  2927. // case 5, old virtual server is different from new virtual server
  2928. oldVirtualServer: &utilipvs.VirtualServer{
  2929. Address: net.ParseIP("1.2.3.4"),
  2930. Protocol: string(v1.ProtocolSCTP),
  2931. Port: 8080,
  2932. Scheduler: "rr",
  2933. Flags: utilipvs.FlagHashed,
  2934. },
  2935. svcName: "bar",
  2936. newVirtualServer: &utilipvs.VirtualServer{
  2937. Address: net.ParseIP("1.2.3.4"),
  2938. Protocol: string(v1.ProtocolSCTP),
  2939. Port: 8080,
  2940. Scheduler: "rr",
  2941. Flags: utilipvs.FlagPersistent,
  2942. },
  2943. bindAddr: false,
  2944. },
  2945. {
  2946. // case 6, old virtual server is different from new virtual server
  2947. oldVirtualServer: &utilipvs.VirtualServer{
  2948. Address: net.ParseIP("1.2.3.4"),
  2949. Protocol: string(v1.ProtocolSCTP),
  2950. Port: 8080,
  2951. Scheduler: "rr",
  2952. Flags: utilipvs.FlagHashed,
  2953. },
  2954. svcName: "bar",
  2955. newVirtualServer: &utilipvs.VirtualServer{
  2956. Address: net.ParseIP("1.2.3.4"),
  2957. Protocol: string(v1.ProtocolSCTP),
  2958. Port: 8080,
  2959. Scheduler: "wlc",
  2960. Flags: utilipvs.FlagHashed,
  2961. },
  2962. bindAddr: false,
  2963. },
  2964. {
  2965. // case 7, old virtual server is nil, and create new virtual server
  2966. oldVirtualServer: nil,
  2967. svcName: "baz",
  2968. newVirtualServer: &utilipvs.VirtualServer{
  2969. Address: net.ParseIP("1.2.3.4"),
  2970. Protocol: string(v1.ProtocolSCTP),
  2971. Port: 53,
  2972. Scheduler: "rr",
  2973. Flags: utilipvs.FlagHashed,
  2974. },
  2975. bindAddr: true,
  2976. },
  2977. }
  2978. for i := range testCases {
  2979. ipt := iptablestest.NewFake()
  2980. ipvs := ipvstest.NewFake()
  2981. ipset := ipsettest.NewFake(testIPSetVersion)
  2982. proxier := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  2983. proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  2984. if testCases[i].oldVirtualServer != nil {
  2985. if err := proxier.ipvs.AddVirtualServer(testCases[i].oldVirtualServer); err != nil {
  2986. t.Errorf("Case [%d], unexpected add IPVS virtual server error: %v", i, err)
  2987. }
  2988. }
  2989. if err := proxier.syncService(testCases[i].svcName, testCases[i].newVirtualServer, testCases[i].bindAddr); err != nil {
  2990. t.Errorf("Case [%d], unexpected sync IPVS virtual server error: %v", i, err)
  2991. }
  2992. // check
  2993. list, err := proxier.ipvs.GetVirtualServers()
  2994. if err != nil {
  2995. t.Errorf("Case [%d], unexpected list IPVS virtual server error: %v", i, err)
  2996. }
  2997. if len(list) != 1 {
  2998. t.Errorf("Case [%d], expect %d virtual servers, got %d", i, 1, len(list))
  2999. continue
  3000. }
  3001. if !list[0].Equal(testCases[i].newVirtualServer) {
  3002. t.Errorf("Case [%d], unexpected mismatch, expect: %#v, got: %#v", i, testCases[i].newVirtualServer, list[0])
  3003. }
  3004. }
  3005. }
  3006. func buildFakeProxier() (*iptablestest.FakeIPTables, *Proxier) {
  3007. ipt := iptablestest.NewFake()
  3008. ipvs := ipvstest.NewFake()
  3009. ipset := ipsettest.NewFake(testIPSetVersion)
  3010. return ipt, NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  3011. }
  3012. func hasJump(rules []iptablestest.Rule, destChain, ipSet string) bool {
  3013. for _, r := range rules {
  3014. if r[iptablestest.Jump] == destChain {
  3015. if ipSet == "" {
  3016. return true
  3017. }
  3018. if strings.Contains(r[iptablestest.MatchSet], ipSet) {
  3019. return true
  3020. }
  3021. }
  3022. }
  3023. return false
  3024. }
  3025. func hasMasqRandomFully(rules []iptablestest.Rule) bool {
  3026. for _, r := range rules {
  3027. if r[iptablestest.Masquerade] == "--random-fully" {
  3028. return true
  3029. }
  3030. }
  3031. return false
  3032. }
  3033. // checkIptabless to check expected iptables chain and rules
  3034. func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) {
  3035. for epChain, epRules := range epIpt {
  3036. rules := ipt.GetRules(epChain)
  3037. for _, epRule := range epRules {
  3038. if !hasJump(rules, epRule.JumpChain, epRule.MatchSet) {
  3039. t.Errorf("Didn't find jump from chain %v match set %v to %v", epChain, epRule.MatchSet, epRule.JumpChain)
  3040. }
  3041. }
  3042. }
  3043. }
  3044. // checkIPSet to check expected ipset and entries
  3045. func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) {
  3046. for set, entries := range ipSet {
  3047. ents, err := fp.ipset.ListEntries(set)
  3048. if err != nil || len(ents) != len(entries) {
  3049. t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents))
  3050. continue
  3051. }
  3052. expectedEntries := []string{}
  3053. for _, entry := range entries {
  3054. expectedEntries = append(expectedEntries, entry.String())
  3055. }
  3056. sort.Strings(ents)
  3057. sort.Strings(expectedEntries)
  3058. if !reflect.DeepEqual(ents, expectedEntries) {
  3059. t.Errorf("Check ipset entries failed for ipset: %q", set)
  3060. }
  3061. }
  3062. }
  3063. // checkIPVS to check expected ipvs service and destination
  3064. func checkIPVS(t *testing.T, fp *Proxier, vs *netlinktest.ExpectedVirtualServer) {
  3065. services, err := fp.ipvs.GetVirtualServers()
  3066. if err != nil {
  3067. t.Errorf("Failed to get ipvs services, err: %v", err)
  3068. }
  3069. if len(services) != vs.VSNum {
  3070. t.Errorf("Expect %d ipvs services, got %d", vs.VSNum, len(services))
  3071. }
  3072. for _, svc := range services {
  3073. if svc.Address.String() == vs.IP && svc.Port == vs.Port && svc.Protocol == vs.Protocol {
  3074. destinations, _ := fp.ipvs.GetRealServers(svc)
  3075. if len(destinations) != len(vs.RS) {
  3076. t.Errorf("Expected %d destinations, got %d destinations", len(vs.RS), len(destinations))
  3077. }
  3078. if len(vs.RS) == 1 {
  3079. if destinations[0].Address.String() != vs.RS[0].IP || destinations[0].Port != vs.RS[0].Port {
  3080. t.Errorf("Unexpected mismatch destinations")
  3081. }
  3082. }
  3083. }
  3084. }
  3085. }
  3086. func TestCleanLegacyService(t *testing.T) {
  3087. ipt := iptablestest.NewFake()
  3088. ipvs := ipvstest.NewFake()
  3089. ipset := ipsettest.NewFake(testIPSetVersion)
  3090. fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"3.3.3.0/24", "4.4.4.0/24"}), false)
  3091. // All ipvs services that were processed in the latest sync loop.
  3092. activeServices := map[string]bool{"ipvs0": true, "ipvs1": true}
  3093. // All ipvs services in the system.
  3094. currentServices := map[string]*utilipvs.VirtualServer{
  3095. // Created by kube-proxy.
  3096. "ipvs0": {
  3097. Address: net.ParseIP("1.1.1.1"),
  3098. Protocol: string(v1.ProtocolUDP),
  3099. Port: 53,
  3100. Scheduler: "rr",
  3101. Flags: utilipvs.FlagHashed,
  3102. },
  3103. // Created by kube-proxy.
  3104. "ipvs1": {
  3105. Address: net.ParseIP("2.2.2.2"),
  3106. Protocol: string(v1.ProtocolUDP),
  3107. Port: 54,
  3108. Scheduler: "rr",
  3109. Flags: utilipvs.FlagHashed,
  3110. },
  3111. // Created by an external party.
  3112. "ipvs2": {
  3113. Address: net.ParseIP("3.3.3.3"),
  3114. Protocol: string(v1.ProtocolUDP),
  3115. Port: 55,
  3116. Scheduler: "rr",
  3117. Flags: utilipvs.FlagHashed,
  3118. },
  3119. // Created by an external party.
  3120. "ipvs3": {
  3121. Address: net.ParseIP("4.4.4.4"),
  3122. Protocol: string(v1.ProtocolUDP),
  3123. Port: 56,
  3124. Scheduler: "rr",
  3125. Flags: utilipvs.FlagHashed,
  3126. },
  3127. // Created by an external party.
  3128. "ipvs4": {
  3129. Address: net.ParseIP("5.5.5.5"),
  3130. Protocol: string(v1.ProtocolUDP),
  3131. Port: 57,
  3132. Scheduler: "rr",
  3133. Flags: utilipvs.FlagHashed,
  3134. },
  3135. // Created by kube-proxy, but now stale.
  3136. "ipvs5": {
  3137. Address: net.ParseIP("6.6.6.6"),
  3138. Protocol: string(v1.ProtocolUDP),
  3139. Port: 58,
  3140. Scheduler: "rr",
  3141. Flags: utilipvs.FlagHashed,
  3142. },
  3143. }
  3144. for v := range currentServices {
  3145. fp.ipvs.AddVirtualServer(currentServices[v])
  3146. }
  3147. fp.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  3148. activeBindAddrs := map[string]bool{"1.1.1.1": true, "2.2.2.2": true, "3.3.3.3": true, "4.4.4.4": true}
  3149. // This is ipv4-only so ipv6 addresses should be ignored
  3150. currentBindAddrs := []string{"1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4", "5.5.5.5", "6.6.6.6", "fd80::1:2:3", "fd80::1:2:4"}
  3151. for i := range currentBindAddrs {
  3152. fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], DefaultDummyDevice)
  3153. }
  3154. fp.cleanLegacyService(activeServices, currentServices, map[string]bool{"5.5.5.5": true, "6.6.6.6": true})
  3155. // ipvs4 and ipvs5 should have been cleaned.
  3156. remainingVirtualServers, _ := fp.ipvs.GetVirtualServers()
  3157. if len(remainingVirtualServers) != 4 {
  3158. t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 4, len(remainingVirtualServers))
  3159. }
  3160. for _, vs := range remainingVirtualServers {
  3161. // Checking that ipvs4 and ipvs5 were removed.
  3162. if vs.Port == 57 {
  3163. t.Errorf("Expected ipvs4 to be removed after cleanup. It still remains")
  3164. }
  3165. if vs.Port == 58 {
  3166. t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains")
  3167. }
  3168. }
  3169. // Addresses 5.5.5.5 and 6.6.6.6 should not be bound any more, but the ipv6 addresses should remain
  3170. remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  3171. if len(remainingAddrs) != 6 {
  3172. t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 6, len(remainingAddrs))
  3173. }
  3174. // check that address "1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4" are bound, ignore ipv6 addresses
  3175. remainingAddrsMap := make(map[string]bool)
  3176. for _, a := range remainingAddrs {
  3177. if net.ParseIP(a).To4() == nil {
  3178. continue
  3179. }
  3180. remainingAddrsMap[a] = true
  3181. }
  3182. if !reflect.DeepEqual(activeBindAddrs, remainingAddrsMap) {
  3183. t.Errorf("Expected remainingAddrsMap %v, got %v", activeBindAddrs, remainingAddrsMap)
  3184. }
  3185. }
  3186. func TestCleanLegacyServiceWithRealServers(t *testing.T) {
  3187. ipt := iptablestest.NewFake()
  3188. ipvs := ipvstest.NewFake()
  3189. ipset := ipsettest.NewFake(testIPSetVersion)
  3190. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  3191. // all deleted expect ipvs2
  3192. activeServices := map[string]bool{"ipvs2": true}
  3193. // All ipvs services in the system.
  3194. currentServices := map[string]*utilipvs.VirtualServer{
  3195. "ipvs0": { // deleted with real servers
  3196. Address: net.ParseIP("1.1.1.1"),
  3197. Protocol: string(v1.ProtocolUDP),
  3198. Port: 53,
  3199. Scheduler: "rr",
  3200. Flags: utilipvs.FlagHashed,
  3201. },
  3202. "ipvs1": { // deleted no real server
  3203. Address: net.ParseIP("2.2.2.2"),
  3204. Protocol: string(v1.ProtocolUDP),
  3205. Port: 54,
  3206. Scheduler: "rr",
  3207. Flags: utilipvs.FlagHashed,
  3208. },
  3209. "ipvs2": { // not deleted
  3210. Address: net.ParseIP("3.3.3.3"),
  3211. Protocol: string(v1.ProtocolUDP),
  3212. Port: 54,
  3213. Scheduler: "rr",
  3214. Flags: utilipvs.FlagHashed,
  3215. },
  3216. }
  3217. // "ipvs0" has a real server, but it should still be deleted since the Service is deleted
  3218. realServers := map[*utilipvs.VirtualServer]*utilipvs.RealServer{
  3219. {
  3220. Address: net.ParseIP("1.1.1.1"),
  3221. Protocol: string(v1.ProtocolUDP),
  3222. Port: 53,
  3223. Scheduler: "rr",
  3224. Flags: utilipvs.FlagHashed,
  3225. }: {
  3226. Address: net.ParseIP("10.180.0.1"),
  3227. Port: uint16(53),
  3228. Weight: 1,
  3229. },
  3230. }
  3231. for v := range currentServices {
  3232. fp.ipvs.AddVirtualServer(currentServices[v])
  3233. }
  3234. for v, r := range realServers {
  3235. fp.ipvs.AddRealServer(v, r)
  3236. }
  3237. fp.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  3238. activeBindAddrs := map[string]bool{"3.3.3.3": true}
  3239. currentBindAddrs := []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}
  3240. for i := range currentBindAddrs {
  3241. fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], DefaultDummyDevice)
  3242. }
  3243. fp.cleanLegacyService(activeServices, currentServices, map[string]bool{"1.1.1.1": true, "2.2.2.2": true})
  3244. remainingVirtualServers, _ := fp.ipvs.GetVirtualServers()
  3245. if len(remainingVirtualServers) != 1 {
  3246. t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 1, len(remainingVirtualServers))
  3247. }
  3248. if remainingVirtualServers[0] != currentServices["ipvs2"] {
  3249. t.Logf("actual virtual server: %v", remainingVirtualServers[0])
  3250. t.Logf("expected virtual server: %v", currentServices["ipvs0"])
  3251. t.Errorf("unexpected IPVS service")
  3252. }
  3253. remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  3254. if len(remainingAddrs) != 1 {
  3255. t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs))
  3256. }
  3257. // check that address is "3.3.3.3"
  3258. remainingAddrsMap := make(map[string]bool)
  3259. for _, a := range remainingAddrs {
  3260. if net.ParseIP(a).To4() == nil {
  3261. continue
  3262. }
  3263. remainingAddrsMap[a] = true
  3264. }
  3265. if !reflect.DeepEqual(activeBindAddrs, remainingAddrsMap) {
  3266. t.Errorf("Expected remainingAddrsMap %v, got %v", activeBindAddrs, remainingAddrsMap)
  3267. }
  3268. }
  3269. func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) {
  3270. ipt := iptablestest.NewFake()
  3271. ipvs := ipvstest.NewFake()
  3272. ipset := ipsettest.NewFake(testIPSetVersion)
  3273. gtm := NewGracefulTerminationManager(ipvs)
  3274. fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"4.4.4.4/32"}), false)
  3275. fp.gracefuldeleteManager = gtm
  3276. vs := &utilipvs.VirtualServer{
  3277. Address: net.ParseIP("4.4.4.4"),
  3278. Protocol: string(v1.ProtocolUDP),
  3279. Port: 56,
  3280. Scheduler: "rr",
  3281. Flags: utilipvs.FlagHashed,
  3282. }
  3283. fp.ipvs.AddVirtualServer(vs)
  3284. rss := []*utilipvs.RealServer{
  3285. {
  3286. Address: net.ParseIP("10.10.10.10"),
  3287. Port: 56,
  3288. ActiveConn: 0,
  3289. InactiveConn: 0,
  3290. },
  3291. {
  3292. Address: net.ParseIP("11.11.11.11"),
  3293. Port: 56,
  3294. ActiveConn: 0,
  3295. InactiveConn: 0,
  3296. },
  3297. }
  3298. for _, rs := range rss {
  3299. fp.ipvs.AddRealServer(vs, rs)
  3300. }
  3301. fp.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  3302. fp.netlinkHandle.EnsureAddressBind("4.4.4.4", DefaultDummyDevice)
  3303. fp.cleanLegacyService(
  3304. map[string]bool{},
  3305. map[string]*utilipvs.VirtualServer{"ipvs0": vs},
  3306. map[string]bool{"4.4.4.4": true},
  3307. )
  3308. fp.gracefuldeleteManager.tryDeleteRs()
  3309. remainingRealServers, _ := fp.ipvs.GetRealServers(vs)
  3310. if len(remainingRealServers) != 2 {
  3311. t.Errorf("Expected number of remaining IPVS real servers after cleanup should be %v. Got %v", 2, len(remainingRealServers))
  3312. }
  3313. }
  3314. func TestCleanLegacyService6(t *testing.T) {
  3315. ipt := iptablestest.NewFake()
  3316. ipvs := ipvstest.NewFake()
  3317. ipset := ipsettest.NewFake(testIPSetVersion)
  3318. fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"3000::/64", "4000::/64"}), false)
  3319. fp.nodeIP = net.ParseIP("::1")
  3320. // All ipvs services that were processed in the latest sync loop.
  3321. activeServices := map[string]bool{"ipvs0": true, "ipvs1": true}
  3322. // All ipvs services in the system.
  3323. currentServices := map[string]*utilipvs.VirtualServer{
  3324. // Created by kube-proxy.
  3325. "ipvs0": {
  3326. Address: net.ParseIP("1000::1"),
  3327. Protocol: string(v1.ProtocolUDP),
  3328. Port: 53,
  3329. Scheduler: "rr",
  3330. Flags: utilipvs.FlagHashed,
  3331. },
  3332. // Created by kube-proxy.
  3333. "ipvs1": {
  3334. Address: net.ParseIP("1000::2"),
  3335. Protocol: string(v1.ProtocolUDP),
  3336. Port: 54,
  3337. Scheduler: "rr",
  3338. Flags: utilipvs.FlagHashed,
  3339. },
  3340. // Created by an external party.
  3341. "ipvs2": {
  3342. Address: net.ParseIP("3000::1"),
  3343. Protocol: string(v1.ProtocolUDP),
  3344. Port: 55,
  3345. Scheduler: "rr",
  3346. Flags: utilipvs.FlagHashed,
  3347. },
  3348. // Created by an external party.
  3349. "ipvs3": {
  3350. Address: net.ParseIP("4000::1"),
  3351. Protocol: string(v1.ProtocolUDP),
  3352. Port: 56,
  3353. Scheduler: "rr",
  3354. Flags: utilipvs.FlagHashed,
  3355. },
  3356. // Created by an external party.
  3357. "ipvs4": {
  3358. Address: net.ParseIP("5000::1"),
  3359. Protocol: string(v1.ProtocolUDP),
  3360. Port: 57,
  3361. Scheduler: "rr",
  3362. Flags: utilipvs.FlagHashed,
  3363. },
  3364. // Created by kube-proxy, but now stale.
  3365. "ipvs5": {
  3366. Address: net.ParseIP("1000::6"),
  3367. Protocol: string(v1.ProtocolUDP),
  3368. Port: 58,
  3369. Scheduler: "rr",
  3370. Flags: utilipvs.FlagHashed,
  3371. },
  3372. }
  3373. for v := range currentServices {
  3374. fp.ipvs.AddVirtualServer(currentServices[v])
  3375. }
  3376. fp.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  3377. activeBindAddrs := map[string]bool{"1000::1": true, "1000::2": true, "3000::1": true, "4000::1": true}
  3378. // This is ipv6-only so ipv4 addresses should be ignored
  3379. currentBindAddrs := []string{"1000::1", "1000::2", "3000::1", "4000::1", "5000::1", "1000::6", "1.1.1.1", "2.2.2.2"}
  3380. for i := range currentBindAddrs {
  3381. fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], DefaultDummyDevice)
  3382. }
  3383. fp.cleanLegacyService(activeServices, currentServices, map[string]bool{"5000::1": true, "1000::6": true})
  3384. // ipvs4 and ipvs5 should have been cleaned.
  3385. remainingVirtualServers, _ := fp.ipvs.GetVirtualServers()
  3386. if len(remainingVirtualServers) != 4 {
  3387. t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 4, len(remainingVirtualServers))
  3388. }
  3389. for _, vs := range remainingVirtualServers {
  3390. // Checking that ipvs4 and ipvs5 were removed.
  3391. if vs.Port == 57 {
  3392. t.Errorf("Expected ipvs4 to be removed after cleanup. It still remains")
  3393. }
  3394. if vs.Port == 58 {
  3395. t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains")
  3396. }
  3397. }
  3398. // Addresses 5000::1 and 1000::6 should not be bound any more, but the ipv4 addresses should remain
  3399. remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  3400. if len(remainingAddrs) != 6 {
  3401. t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 6, len(remainingAddrs))
  3402. }
  3403. // check that address "1000::1", "1000::2", "3000::1", "4000::1" are still bound, ignore ipv4 addresses
  3404. remainingAddrsMap := make(map[string]bool)
  3405. for _, a := range remainingAddrs {
  3406. if net.ParseIP(a).To4() != nil {
  3407. continue
  3408. }
  3409. remainingAddrsMap[a] = true
  3410. }
  3411. if !reflect.DeepEqual(activeBindAddrs, remainingAddrsMap) {
  3412. t.Errorf("Expected remainingAddrsMap %v, got %v", activeBindAddrs, remainingAddrsMap)
  3413. }
  3414. }
  3415. func TestMultiPortServiceBindAddr(t *testing.T) {
  3416. ipt := iptablestest.NewFake()
  3417. ipvs := ipvstest.NewFake()
  3418. ipset := ipsettest.NewFake(testIPSetVersion)
  3419. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
  3420. service1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
  3421. svc.Spec.Type = v1.ServiceTypeClusterIP
  3422. svc.Spec.ClusterIP = "172.16.55.4"
  3423. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "TCP", 1234, 0, 0)
  3424. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 1235, 0, 0)
  3425. })
  3426. service2 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
  3427. svc.Spec.Type = v1.ServiceTypeClusterIP
  3428. svc.Spec.ClusterIP = "172.16.55.4"
  3429. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "TCP", 1234, 0, 0)
  3430. })
  3431. service3 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
  3432. svc.Spec.Type = v1.ServiceTypeClusterIP
  3433. svc.Spec.ClusterIP = "172.16.55.4"
  3434. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "TCP", 1234, 0, 0)
  3435. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 1235, 0, 0)
  3436. svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port3", "UDP", 1236, 0, 0)
  3437. })
  3438. fp.servicesSynced = true
  3439. fp.endpointsSynced = true
  3440. // first, add multi-port service1
  3441. fp.OnServiceAdd(service1)
  3442. fp.syncProxyRules()
  3443. remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  3444. // should only remain address "172.16.55.4"
  3445. if len(remainingAddrs) != 1 {
  3446. t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs))
  3447. }
  3448. if remainingAddrs[0] != "172.16.55.4" {
  3449. t.Errorf("Expected remaining address should be %s, got %s", "172.16.55.4", remainingAddrs[0])
  3450. }
  3451. // update multi-port service1 to single-port service2
  3452. fp.OnServiceUpdate(service1, service2)
  3453. fp.syncProxyRules()
  3454. remainingAddrs, _ = fp.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  3455. // should still only remain address "172.16.55.4"
  3456. if len(remainingAddrs) != 1 {
  3457. t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs))
  3458. } else if remainingAddrs[0] != "172.16.55.4" {
  3459. t.Errorf("Expected remaining address should be %s, got %s", "172.16.55.4", remainingAddrs[0])
  3460. }
  3461. // update single-port service2 to multi-port service3
  3462. fp.OnServiceUpdate(service2, service3)
  3463. fp.syncProxyRules()
  3464. remainingAddrs, _ = fp.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  3465. // should still only remain address "172.16.55.4"
  3466. if len(remainingAddrs) != 1 {
  3467. t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs))
  3468. } else if remainingAddrs[0] != "172.16.55.4" {
  3469. t.Errorf("Expected remaining address should be %s, got %s", "172.16.55.4", remainingAddrs[0])
  3470. }
  3471. // delete multi-port service3
  3472. fp.OnServiceDelete(service3)
  3473. fp.syncProxyRules()
  3474. remainingAddrs, _ = fp.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  3475. // all addresses should be unbound
  3476. if len(remainingAddrs) != 0 {
  3477. t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 0, len(remainingAddrs))
  3478. }
  3479. }
  3480. func Test_getFirstColumn(t *testing.T) {
  3481. testCases := []struct {
  3482. name string
  3483. fileContent string
  3484. want []string
  3485. wantErr bool
  3486. }{
  3487. {
  3488. name: "valid content",
  3489. fileContent: `libiscsi_tcp 28672 1 iscsi_tcp, Live 0xffffffffc07ae000
  3490. libiscsi 57344 3 ib_iser,iscsi_tcp,libiscsi_tcp, Live 0xffffffffc079a000
  3491. raid10 57344 0 - Live 0xffffffffc0597000`,
  3492. want: []string{"libiscsi_tcp", "libiscsi", "raid10"},
  3493. wantErr: false,
  3494. },
  3495. }
  3496. for _, test := range testCases {
  3497. t.Run(test.name, func(t *testing.T) {
  3498. got, err := getFirstColumn(strings.NewReader(test.fileContent))
  3499. if (err != nil) != test.wantErr {
  3500. t.Errorf("getFirstColumn() error = %v, wantErr %v", err, test.wantErr)
  3501. return
  3502. }
  3503. if !reflect.DeepEqual(got, test.want) {
  3504. t.Errorf("getFirstColumn() = %v, want %v", got, test.want)
  3505. }
  3506. })
  3507. }
  3508. }
  3509. // The majority of EndpointSlice specific tests are not ipvs specific and focus on
  3510. // the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the
  3511. // ipvs proxier supports translating EndpointSlices to ipvs output.
  3512. func TestEndpointSliceE2E(t *testing.T) {
  3513. ipt := iptablestest.NewFake()
  3514. ipvs := ipvstest.NewFake()
  3515. ipset := ipsettest.NewFake(testIPSetVersion)
  3516. fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true)
  3517. fp.servicesSynced = true
  3518. fp.endpointsSynced = true
  3519. fp.endpointSlicesSynced = true
  3520. // Add initial service
  3521. serviceName := "svc1"
  3522. namespaceName := "ns1"
  3523. fp.OnServiceAdd(&v1.Service{
  3524. ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
  3525. Spec: v1.ServiceSpec{
  3526. ClusterIP: "172.20.1.1",
  3527. Selector: map[string]string{"foo": "bar"},
  3528. Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}},
  3529. },
  3530. })
  3531. // Add initial endpoint slice
  3532. tcpProtocol := v1.ProtocolTCP
  3533. endpointSlice := &discovery.EndpointSlice{
  3534. ObjectMeta: metav1.ObjectMeta{
  3535. Name: fmt.Sprintf("%s-1", serviceName),
  3536. Namespace: namespaceName,
  3537. Labels: map[string]string{discovery.LabelServiceName: serviceName},
  3538. },
  3539. Ports: []discovery.EndpointPort{{
  3540. Name: utilpointer.StringPtr(""),
  3541. Port: utilpointer.Int32Ptr(80),
  3542. Protocol: &tcpProtocol,
  3543. }},
  3544. AddressType: discovery.AddressTypeIPv4,
  3545. Endpoints: []discovery.Endpoint{{
  3546. Addresses: []string{"10.0.1.1"},
  3547. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  3548. Topology: map[string]string{"kubernetes.io/hostname": testHostname},
  3549. }, {
  3550. Addresses: []string{"10.0.1.2"},
  3551. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  3552. Topology: map[string]string{"kubernetes.io/hostname": "node2"},
  3553. }, {
  3554. Addresses: []string{"10.0.1.3"},
  3555. Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
  3556. Topology: map[string]string{"kubernetes.io/hostname": "node3"},
  3557. }},
  3558. }
  3559. fp.OnEndpointSliceAdd(endpointSlice)
  3560. fp.syncProxyRules()
  3561. // Ensure that Proxier updates ipvs appropriately after EndpointSlice update
  3562. assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
  3563. activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
  3564. assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-LOOP-BACK")
  3565. assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod")
  3566. virtualServers1, vsErr1 := ipvs.GetVirtualServers()
  3567. assert.Nil(t, vsErr1, "Expected no error getting virtual servers")
  3568. assert.Len(t, virtualServers1, 1, "Expected 1 virtual server")
  3569. realServers1, rsErr1 := ipvs.GetRealServers(virtualServers1[0])
  3570. assert.Nil(t, rsErr1, "Expected no error getting real servers")
  3571. assert.Len(t, realServers1, 3, "Expected 3 real servers")
  3572. assert.Equal(t, realServers1[0].String(), "10.0.1.1:80")
  3573. assert.Equal(t, realServers1[1].String(), "10.0.1.2:80")
  3574. assert.Equal(t, realServers1[2].String(), "10.0.1.3:80")
  3575. fp.OnEndpointSliceDelete(endpointSlice)
  3576. fp.syncProxyRules()
  3577. // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete
  3578. assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
  3579. activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
  3580. assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
  3581. virtualServers2, vsErr2 := ipvs.GetVirtualServers()
  3582. assert.Nil(t, vsErr2, "Expected no error getting virtual servers")
  3583. assert.Len(t, virtualServers2, 1, "Expected 1 virtual server")
  3584. realServers2, rsErr2 := ipvs.GetRealServers(virtualServers2[0])
  3585. assert.Nil(t, rsErr2, "Expected no error getting real servers")
  3586. assert.Len(t, realServers2, 0, "Expected 0 real servers")
  3587. }
  3588. func TestFilterCIDRs(t *testing.T) {
  3589. var cidrList []string
  3590. var cidrs []string
  3591. var expected []string
  3592. cidrs = filterCIDRs(true, []string{})
  3593. if len(cidrs) > 0 {
  3594. t.Errorf("An empty list produces a non-empty return %v", cidrs)
  3595. }
  3596. cidrList = []string{"1000::/64", "10.0.0.0/16", "11.0.0.0/16", "2000::/64"}
  3597. expected = []string{"1000::/64", "2000::/64"}
  3598. cidrs = filterCIDRs(true, cidrList)
  3599. if !reflect.DeepEqual(cidrs, expected) {
  3600. t.Errorf("cidrs %v is not expected %v", cidrs, expected)
  3601. }
  3602. expected = []string{"10.0.0.0/16", "11.0.0.0/16"}
  3603. cidrs = filterCIDRs(false, cidrList)
  3604. if !reflect.DeepEqual(cidrs, expected) {
  3605. t.Errorf("cidrs %v is not expected %v", cidrs, expected)
  3606. }
  3607. cidrList = []string{"1000::/64", "2000::/64"}
  3608. expected = []string{}
  3609. cidrs = filterCIDRs(false, cidrList)
  3610. if len(cidrs) > 0 {
  3611. t.Errorf("cidrs %v is not expected %v", cidrs, expected)
  3612. }
  3613. }