proxier.go 80 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ipvs
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io"
  18. "io/ioutil"
  19. "net"
  20. "os"
  21. "reflect"
  22. "regexp"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "k8s.io/klog"
  29. utilexec "k8s.io/utils/exec"
  30. utilnet "k8s.io/utils/net"
  31. v1 "k8s.io/api/core/v1"
  32. discovery "k8s.io/api/discovery/v1beta1"
  33. "k8s.io/apimachinery/pkg/types"
  34. "k8s.io/apimachinery/pkg/util/sets"
  35. "k8s.io/apimachinery/pkg/util/version"
  36. "k8s.io/apimachinery/pkg/util/wait"
  37. utilfeature "k8s.io/apiserver/pkg/util/feature"
  38. "k8s.io/client-go/tools/record"
  39. "k8s.io/kubernetes/pkg/features"
  40. "k8s.io/kubernetes/pkg/proxy"
  41. "k8s.io/kubernetes/pkg/proxy/healthcheck"
  42. "k8s.io/kubernetes/pkg/proxy/metaproxier"
  43. "k8s.io/kubernetes/pkg/proxy/metrics"
  44. utilproxy "k8s.io/kubernetes/pkg/proxy/util"
  45. proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
  46. "k8s.io/kubernetes/pkg/util/async"
  47. "k8s.io/kubernetes/pkg/util/conntrack"
  48. utilipset "k8s.io/kubernetes/pkg/util/ipset"
  49. utiliptables "k8s.io/kubernetes/pkg/util/iptables"
  50. utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
  51. utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
  52. )
  53. const (
  54. // kubeServicesChain is the services portal chain
  55. kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
  56. // KubeFireWallChain is the kubernetes firewall chain.
  57. KubeFireWallChain utiliptables.Chain = "KUBE-FIREWALL"
  58. // kubePostroutingChain is the kubernetes postrouting chain
  59. kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
  60. // KubeMarkMasqChain is the mark-for-masquerade chain
  61. KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
  62. // KubeNodePortChain is the kubernetes node port chain
  63. KubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT"
  64. // KubeMarkDropChain is the mark-for-drop chain
  65. KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
  66. // KubeForwardChain is the kubernetes forward chain
  67. KubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
  68. // KubeLoadBalancerChain is the kubernetes chain for loadbalancer type service
  69. KubeLoadBalancerChain utiliptables.Chain = "KUBE-LOAD-BALANCER"
  70. // DefaultScheduler is the default ipvs scheduler algorithm - round robin.
  71. DefaultScheduler = "rr"
  72. // DefaultDummyDevice is the default dummy interface which ipvs service address will bind to it.
  73. DefaultDummyDevice = "kube-ipvs0"
  74. )
  75. // iptablesJumpChain is tables of iptables chains that ipvs proxier used to install iptables or cleanup iptables.
  76. // `to` is the iptables chain we want to operate.
  77. // `from` is the source iptables chain
  78. var iptablesJumpChain = []struct {
  79. table utiliptables.Table
  80. from utiliptables.Chain
  81. to utiliptables.Chain
  82. comment string
  83. }{
  84. {utiliptables.TableNAT, utiliptables.ChainOutput, kubeServicesChain, "kubernetes service portals"},
  85. {utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"},
  86. {utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"},
  87. {utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, "kubernetes forwarding rules"},
  88. }
  89. var iptablesChains = []struct {
  90. table utiliptables.Table
  91. chain utiliptables.Chain
  92. }{
  93. {utiliptables.TableNAT, kubeServicesChain},
  94. {utiliptables.TableNAT, kubePostroutingChain},
  95. {utiliptables.TableNAT, KubeFireWallChain},
  96. {utiliptables.TableNAT, KubeNodePortChain},
  97. {utiliptables.TableNAT, KubeLoadBalancerChain},
  98. {utiliptables.TableNAT, KubeMarkMasqChain},
  99. {utiliptables.TableNAT, KubeMarkDropChain},
  100. {utiliptables.TableFilter, KubeForwardChain},
  101. }
  102. var iptablesCleanupChains = []struct {
  103. table utiliptables.Table
  104. chain utiliptables.Chain
  105. }{
  106. {utiliptables.TableNAT, kubeServicesChain},
  107. {utiliptables.TableNAT, kubePostroutingChain},
  108. {utiliptables.TableNAT, KubeFireWallChain},
  109. {utiliptables.TableNAT, KubeNodePortChain},
  110. {utiliptables.TableNAT, KubeLoadBalancerChain},
  111. {utiliptables.TableFilter, KubeForwardChain},
  112. }
  113. // ipsetInfo is all ipset we needed in ipvs proxier
  114. var ipsetInfo = []struct {
  115. name string
  116. setType utilipset.Type
  117. comment string
  118. }{
  119. {kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
  120. {kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment},
  121. {kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment},
  122. {kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment},
  123. {kubeLoadbalancerFWSet, utilipset.HashIPPort, kubeLoadbalancerFWSetComment},
  124. {kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment},
  125. {kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
  126. {kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
  127. {kubeNodePortSetTCP, utilipset.BitmapPort, kubeNodePortSetTCPComment},
  128. {kubeNodePortLocalSetTCP, utilipset.BitmapPort, kubeNodePortLocalSetTCPComment},
  129. {kubeNodePortSetUDP, utilipset.BitmapPort, kubeNodePortSetUDPComment},
  130. {kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment},
  131. {kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment},
  132. {kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment},
  133. }
  134. // ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to
  135. // `iptables -t nat -A <from> -m set --match-set <name> <matchType> -j <to>`
  136. // example: iptables -t nat -A KUBE-SERVICES -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-NODE-PORT
  137. // ipsets with other match rules will be created Individually.
  138. // Note: kubeNodePortLocalSetTCP must be prior to kubeNodePortSetTCP, the same for UDP.
  139. var ipsetWithIptablesChain = []struct {
  140. name string
  141. from string
  142. to string
  143. matchType string
  144. protocolMatch string
  145. }{
  146. {kubeLoopBackIPSet, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src", ""},
  147. {kubeLoadBalancerSet, string(kubeServicesChain), string(KubeLoadBalancerChain), "dst,dst", ""},
  148. {kubeLoadbalancerFWSet, string(KubeLoadBalancerChain), string(KubeFireWallChain), "dst,dst", ""},
  149. {kubeLoadBalancerSourceCIDRSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""},
  150. {kubeLoadBalancerSourceIPSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""},
  151. {kubeLoadBalancerLocalSet, string(KubeLoadBalancerChain), "RETURN", "dst,dst", ""},
  152. {kubeNodePortLocalSetTCP, string(KubeNodePortChain), "RETURN", "dst", "tcp"},
  153. {kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", "tcp"},
  154. {kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", "udp"},
  155. {kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", "udp"},
  156. {kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", "sctp"},
  157. {kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst,dst", "sctp"},
  158. }
  159. // In IPVS proxy mode, the following flags need to be set
  160. const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
  161. const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables"
  162. const sysctlVSConnTrack = "net/ipv4/vs/conntrack"
  163. const sysctlConnReuse = "net/ipv4/vs/conn_reuse_mode"
  164. const sysctlExpireNoDestConn = "net/ipv4/vs/expire_nodest_conn"
  165. const sysctlExpireQuiescentTemplate = "net/ipv4/vs/expire_quiescent_template"
  166. const sysctlForward = "net/ipv4/ip_forward"
  167. const sysctlArpIgnore = "net/ipv4/conf/all/arp_ignore"
  168. const sysctlArpAnnounce = "net/ipv4/conf/all/arp_announce"
  169. // Proxier is an ipvs based proxy for connections between a localhost:lport
  170. // and services that provide the actual backends.
  171. type Proxier struct {
  172. // endpointsChanges and serviceChanges contains all changes to endpoints and
  173. // services that happened since last syncProxyRules call. For a single object,
  174. // changes are accumulated, i.e. previous is state from before all of them,
  175. // current is state after applying all of those.
  176. endpointsChanges *proxy.EndpointChangeTracker
  177. serviceChanges *proxy.ServiceChangeTracker
  178. mu sync.Mutex // protects the following fields
  179. serviceMap proxy.ServiceMap
  180. endpointsMap proxy.EndpointsMap
  181. portsMap map[utilproxy.LocalPort]utilproxy.Closeable
  182. nodeLabels map[string]string
  183. // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when
  184. // corresponding objects are synced after startup. This is used to avoid updating
  185. // ipvs rules with some partial data after kube-proxy restart.
  186. endpointsSynced bool
  187. endpointSlicesSynced bool
  188. servicesSynced bool
  189. initialized int32
  190. syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
  191. // These are effectively const and do not need the mutex to be held.
  192. syncPeriod time.Duration
  193. minSyncPeriod time.Duration
  194. // Values are CIDR's to exclude when cleaning up IPVS rules.
  195. excludeCIDRs []*net.IPNet
  196. // Set to true to set sysctls arp_ignore and arp_announce
  197. strictARP bool
  198. iptables utiliptables.Interface
  199. ipvs utilipvs.Interface
  200. ipset utilipset.Interface
  201. exec utilexec.Interface
  202. masqueradeAll bool
  203. masqueradeMark string
  204. localDetector proxyutiliptables.LocalTrafficDetector
  205. hostname string
  206. nodeIP net.IP
  207. portMapper utilproxy.PortOpener
  208. recorder record.EventRecorder
  209. serviceHealthServer healthcheck.ServiceHealthServer
  210. healthzServer healthcheck.ProxierHealthUpdater
  211. ipvsScheduler string
  212. // Added as a member to the struct to allow injection for testing.
  213. ipGetter IPGetter
  214. // The following buffers are used to reuse memory and avoid allocations
  215. // that are significantly impacting performance.
  216. iptablesData *bytes.Buffer
  217. filterChainsData *bytes.Buffer
  218. natChains *bytes.Buffer
  219. filterChains *bytes.Buffer
  220. natRules *bytes.Buffer
  221. filterRules *bytes.Buffer
  222. // Added as a member to the struct to allow injection for testing.
  223. netlinkHandle NetLinkHandle
  224. // ipsetList is the list of ipsets that ipvs proxier used.
  225. ipsetList map[string]*IPSet
  226. // Values are as a parameter to select the interfaces which nodeport works.
  227. nodePortAddresses []string
  228. // networkInterfacer defines an interface for several net library functions.
  229. // Inject for test purpose.
  230. networkInterfacer utilproxy.NetworkInterfacer
  231. gracefuldeleteManager *GracefulTerminationManager
  232. }
  233. // IPGetter helps get node network interface IP
  234. type IPGetter interface {
  235. NodeIPs() ([]net.IP, error)
  236. }
  237. // realIPGetter is a real NodeIP handler, it implements IPGetter.
  238. type realIPGetter struct {
  239. // nl is a handle for revoking netlink interface
  240. nl NetLinkHandle
  241. }
  242. // NodeIPs returns all LOCAL type IP addresses from host which are taken as the Node IPs of NodePort service.
  243. // It will list source IP exists in local route table with `kernel` protocol type, and filter out IPVS proxier
  244. // created dummy device `kube-ipvs0` For example,
  245. // $ ip route show table local type local proto kernel
  246. // 10.0.0.1 dev kube-ipvs0 scope host src 10.0.0.1
  247. // 10.0.0.10 dev kube-ipvs0 scope host src 10.0.0.10
  248. // 10.0.0.252 dev kube-ipvs0 scope host src 10.0.0.252
  249. // 100.106.89.164 dev eth0 scope host src 100.106.89.164
  250. // 127.0.0.0/8 dev lo scope host src 127.0.0.1
  251. // 127.0.0.1 dev lo scope host src 127.0.0.1
  252. // 172.17.0.1 dev docker0 scope host src 172.17.0.1
  253. // 192.168.122.1 dev virbr0 scope host src 192.168.122.1
  254. // Then filter out dev==kube-ipvs0, and cut the unique src IP fields,
  255. // Node IP set: [100.106.89.164, 127.0.0.1, 192.168.122.1]
  256. func (r *realIPGetter) NodeIPs() (ips []net.IP, err error) {
  257. // Pass in empty filter device name for list all LOCAL type addresses.
  258. nodeAddress, err := r.nl.GetLocalAddresses("", DefaultDummyDevice)
  259. if err != nil {
  260. return nil, fmt.Errorf("error listing LOCAL type addresses from host, error: %v", err)
  261. }
  262. // translate ip string to IP
  263. for _, ipStr := range nodeAddress.UnsortedList() {
  264. ips = append(ips, net.ParseIP(ipStr))
  265. }
  266. return ips, nil
  267. }
  268. // Proxier implements proxy.Provider
  269. var _ proxy.Provider = &Proxier{}
  270. // parseExcludedCIDRs parses the input strings and returns net.IPNet
  271. // The validation has been done earlier so the error condition will never happen under normal conditions
  272. func parseExcludedCIDRs(excludeCIDRs []string) []*net.IPNet {
  273. var cidrExclusions []*net.IPNet
  274. for _, excludedCIDR := range excludeCIDRs {
  275. _, n, err := net.ParseCIDR(excludedCIDR)
  276. if err != nil {
  277. klog.Errorf("Error parsing exclude CIDR %q, err: %v", excludedCIDR, err)
  278. continue
  279. }
  280. cidrExclusions = append(cidrExclusions, n)
  281. }
  282. return cidrExclusions
  283. }
  284. // NewProxier returns a new Proxier given an iptables and ipvs Interface instance.
  285. // Because of the iptables and ipvs logic, it is assumed that there is only a single Proxier active on a machine.
  286. // An error will be returned if it fails to update or acquire the initial lock.
  287. // Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
  288. // will not terminate if a particular iptables or ipvs call fails.
  289. func NewProxier(ipt utiliptables.Interface,
  290. ipvs utilipvs.Interface,
  291. ipset utilipset.Interface,
  292. sysctl utilsysctl.Interface,
  293. exec utilexec.Interface,
  294. syncPeriod time.Duration,
  295. minSyncPeriod time.Duration,
  296. excludeCIDRs []string,
  297. strictARP bool,
  298. tcpTimeout time.Duration,
  299. tcpFinTimeout time.Duration,
  300. udpTimeout time.Duration,
  301. masqueradeAll bool,
  302. masqueradeBit int,
  303. localDetector proxyutiliptables.LocalTrafficDetector,
  304. hostname string,
  305. nodeIP net.IP,
  306. recorder record.EventRecorder,
  307. healthzServer healthcheck.ProxierHealthUpdater,
  308. scheduler string,
  309. nodePortAddresses []string,
  310. ) (*Proxier, error) {
  311. // Set the route_localnet sysctl we need for
  312. if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
  313. if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
  314. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
  315. }
  316. }
  317. // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
  318. // are connected to a Linux bridge (but not SDN bridges). Until most
  319. // plugins handle this, log when config is missing
  320. if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
  321. klog.Infof("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
  322. }
  323. // Set the conntrack sysctl we need for
  324. if val, _ := sysctl.GetSysctl(sysctlVSConnTrack); val != 1 {
  325. if err := sysctl.SetSysctl(sysctlVSConnTrack, 1); err != nil {
  326. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlVSConnTrack, err)
  327. }
  328. }
  329. // Set the connection reuse mode
  330. if val, _ := sysctl.GetSysctl(sysctlConnReuse); val != 0 {
  331. if err := sysctl.SetSysctl(sysctlConnReuse, 0); err != nil {
  332. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlConnReuse, err)
  333. }
  334. }
  335. // Set the expire_nodest_conn sysctl we need for
  336. if val, _ := sysctl.GetSysctl(sysctlExpireNoDestConn); val != 1 {
  337. if err := sysctl.SetSysctl(sysctlExpireNoDestConn, 1); err != nil {
  338. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireNoDestConn, err)
  339. }
  340. }
  341. // Set the expire_quiescent_template sysctl we need for
  342. if val, _ := sysctl.GetSysctl(sysctlExpireQuiescentTemplate); val != 1 {
  343. if err := sysctl.SetSysctl(sysctlExpireQuiescentTemplate, 1); err != nil {
  344. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireQuiescentTemplate, err)
  345. }
  346. }
  347. // Set the ip_forward sysctl we need for
  348. if val, _ := sysctl.GetSysctl(sysctlForward); val != 1 {
  349. if err := sysctl.SetSysctl(sysctlForward, 1); err != nil {
  350. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlForward, err)
  351. }
  352. }
  353. if strictARP {
  354. // Set the arp_ignore sysctl we need for
  355. if val, _ := sysctl.GetSysctl(sysctlArpIgnore); val != 1 {
  356. if err := sysctl.SetSysctl(sysctlArpIgnore, 1); err != nil {
  357. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpIgnore, err)
  358. }
  359. }
  360. // Set the arp_announce sysctl we need for
  361. if val, _ := sysctl.GetSysctl(sysctlArpAnnounce); val != 2 {
  362. if err := sysctl.SetSysctl(sysctlArpAnnounce, 2); err != nil {
  363. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpAnnounce, err)
  364. }
  365. }
  366. }
  367. // Configure IPVS timeouts if any one of the timeout parameters have been set.
  368. // This is the equivalent to running ipvsadm --set, a value of 0 indicates the
  369. // current system timeout should be preserved
  370. if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 {
  371. if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil {
  372. klog.Warningf("failed to configure IPVS timeouts: %v", err)
  373. }
  374. }
  375. // Generate the masquerade mark to use for SNAT rules.
  376. masqueradeValue := 1 << uint(masqueradeBit)
  377. masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
  378. isIPv6 := utilnet.IsIPv6(nodeIP)
  379. klog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)
  380. if len(scheduler) == 0 {
  381. klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler)
  382. scheduler = DefaultScheduler
  383. }
  384. serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
  385. endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying)
  386. proxier := &Proxier{
  387. portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
  388. serviceMap: make(proxy.ServiceMap),
  389. serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
  390. endpointsMap: make(proxy.EndpointsMap),
  391. endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder, endpointSlicesEnabled),
  392. syncPeriod: syncPeriod,
  393. minSyncPeriod: minSyncPeriod,
  394. excludeCIDRs: parseExcludedCIDRs(excludeCIDRs),
  395. iptables: ipt,
  396. masqueradeAll: masqueradeAll,
  397. masqueradeMark: masqueradeMark,
  398. exec: exec,
  399. localDetector: localDetector,
  400. hostname: hostname,
  401. nodeIP: nodeIP,
  402. portMapper: &listenPortOpener{},
  403. recorder: recorder,
  404. serviceHealthServer: serviceHealthServer,
  405. healthzServer: healthzServer,
  406. ipvs: ipvs,
  407. ipvsScheduler: scheduler,
  408. ipGetter: &realIPGetter{nl: NewNetLinkHandle(isIPv6)},
  409. iptablesData: bytes.NewBuffer(nil),
  410. filterChainsData: bytes.NewBuffer(nil),
  411. natChains: bytes.NewBuffer(nil),
  412. natRules: bytes.NewBuffer(nil),
  413. filterChains: bytes.NewBuffer(nil),
  414. filterRules: bytes.NewBuffer(nil),
  415. netlinkHandle: NewNetLinkHandle(isIPv6),
  416. ipset: ipset,
  417. nodePortAddresses: nodePortAddresses,
  418. networkInterfacer: utilproxy.RealNetwork{},
  419. gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
  420. }
  421. // initialize ipsetList with all sets we needed
  422. proxier.ipsetList = make(map[string]*IPSet)
  423. for _, is := range ipsetInfo {
  424. proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
  425. }
  426. burstSyncs := 2
  427. klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
  428. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
  429. proxier.gracefuldeleteManager.Run()
  430. return proxier, nil
  431. }
  432. // NewDualStackProxier returns a new Proxier for dual-stack operation
  433. func NewDualStackProxier(
  434. ipt [2]utiliptables.Interface,
  435. ipvs utilipvs.Interface,
  436. ipset utilipset.Interface,
  437. sysctl utilsysctl.Interface,
  438. exec utilexec.Interface,
  439. syncPeriod time.Duration,
  440. minSyncPeriod time.Duration,
  441. excludeCIDRs []string,
  442. strictARP bool,
  443. tcpTimeout time.Duration,
  444. tcpFinTimeout time.Duration,
  445. udpTimeout time.Duration,
  446. masqueradeAll bool,
  447. masqueradeBit int,
  448. localDetectors [2]proxyutiliptables.LocalTrafficDetector,
  449. hostname string,
  450. nodeIP [2]net.IP,
  451. recorder record.EventRecorder,
  452. healthzServer healthcheck.ProxierHealthUpdater,
  453. scheduler string,
  454. nodePortAddresses []string,
  455. ) (proxy.Provider, error) {
  456. safeIpset := newSafeIpset(ipset)
  457. // Create an ipv4 instance of the single-stack proxier
  458. ipv4Proxier, err := NewProxier(ipt[0], ipvs, safeIpset, sysctl,
  459. exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP,
  460. tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
  461. localDetectors[0], hostname, nodeIP[0],
  462. recorder, healthzServer, scheduler, nodePortAddresses)
  463. if err != nil {
  464. return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
  465. }
  466. ipv6Proxier, err := NewProxier(ipt[1], ipvs, safeIpset, sysctl,
  467. exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP,
  468. tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
  469. localDetectors[1], hostname, nodeIP[1],
  470. nil, nil, scheduler, nodePortAddresses)
  471. if err != nil {
  472. return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
  473. }
  474. // Return a meta-proxier that dispatch calls between the two
  475. // single-stack proxier instances
  476. return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
  477. }
  478. func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
  479. var filteredCIDRs []string
  480. for _, cidr := range cidrs {
  481. if utilnet.IsIPv6CIDRString(cidr) == wantIPv6 {
  482. filteredCIDRs = append(filteredCIDRs, cidr)
  483. }
  484. }
  485. return filteredCIDRs
  486. }
  487. // internal struct for string service information
  488. type serviceInfo struct {
  489. *proxy.BaseServiceInfo
  490. // The following fields are computed and stored for performance reasons.
  491. serviceNameString string
  492. }
  493. // returns a new proxy.ServicePort which abstracts a serviceInfo
  494. func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
  495. info := &serviceInfo{BaseServiceInfo: baseInfo}
  496. // Store the following for performance reasons.
  497. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  498. svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
  499. info.serviceNameString = svcPortName.String()
  500. return info
  501. }
  502. // KernelHandler can handle the current installed kernel modules.
  503. type KernelHandler interface {
  504. GetModules() ([]string, error)
  505. GetKernelVersion() (string, error)
  506. }
  507. // LinuxKernelHandler implements KernelHandler interface.
  508. type LinuxKernelHandler struct {
  509. executor utilexec.Interface
  510. }
  511. // NewLinuxKernelHandler initializes LinuxKernelHandler with exec.
  512. func NewLinuxKernelHandler() *LinuxKernelHandler {
  513. return &LinuxKernelHandler{
  514. executor: utilexec.New(),
  515. }
  516. }
  517. // GetModules returns all installed kernel modules.
  518. func (handle *LinuxKernelHandler) GetModules() ([]string, error) {
  519. // Check whether IPVS required kernel modules are built-in
  520. kernelVersionStr, err := handle.GetKernelVersion()
  521. if err != nil {
  522. return nil, err
  523. }
  524. kernelVersion, err := version.ParseGeneric(kernelVersionStr)
  525. if err != nil {
  526. return nil, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err)
  527. }
  528. ipvsModules := utilipvs.GetRequiredIPVSModules(kernelVersion)
  529. var bmods []string
  530. // Find out loaded kernel modules. If this is a full static kernel it will try to verify if the module is compiled using /boot/config-KERNELVERSION
  531. modulesFile, err := os.Open("/proc/modules")
  532. if err == os.ErrNotExist {
  533. klog.Warningf("Failed to read file /proc/modules with error %v. Assuming this is a kernel without loadable modules support enabled", err)
  534. kernelConfigFile := fmt.Sprintf("/boot/config-%s", kernelVersionStr)
  535. kConfig, err := ioutil.ReadFile(kernelConfigFile)
  536. if err != nil {
  537. return nil, fmt.Errorf("Failed to read Kernel Config file %s with error %v", kernelConfigFile, err)
  538. }
  539. for _, module := range ipvsModules {
  540. if match, _ := regexp.Match("CONFIG_"+strings.ToUpper(module)+"=y", kConfig); match {
  541. bmods = append(bmods, module)
  542. }
  543. }
  544. return bmods, nil
  545. }
  546. if err != nil {
  547. return nil, fmt.Errorf("Failed to read file /proc/modules with error %v", err)
  548. }
  549. mods, err := getFirstColumn(modulesFile)
  550. if err != nil {
  551. return nil, fmt.Errorf("failed to find loaded kernel modules: %v", err)
  552. }
  553. builtinModsFilePath := fmt.Sprintf("/lib/modules/%s/modules.builtin", kernelVersionStr)
  554. b, err := ioutil.ReadFile(builtinModsFilePath)
  555. if err != nil {
  556. klog.Warningf("Failed to read file %s with error %v. You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", builtinModsFilePath, err)
  557. }
  558. for _, module := range ipvsModules {
  559. if match, _ := regexp.Match(module+".ko", b); match {
  560. bmods = append(bmods, module)
  561. } else {
  562. // Try to load the required IPVS kernel modules if not built in
  563. err := handle.executor.Command("modprobe", "--", module).Run()
  564. if err != nil {
  565. klog.Warningf("Failed to load kernel module %v with modprobe. "+
  566. "You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", module)
  567. }
  568. }
  569. }
  570. return append(mods, bmods...), nil
  571. }
  572. // getFirstColumn reads all the content from r into memory and return a
  573. // slice which consists of the first word from each line.
  574. func getFirstColumn(r io.Reader) ([]string, error) {
  575. b, err := ioutil.ReadAll(r)
  576. if err != nil {
  577. return nil, err
  578. }
  579. lines := strings.Split(string(b), "\n")
  580. words := make([]string, 0, len(lines))
  581. for i := range lines {
  582. fields := strings.Fields(lines[i])
  583. if len(fields) > 0 {
  584. words = append(words, fields[0])
  585. }
  586. }
  587. return words, nil
  588. }
  589. // GetKernelVersion returns currently running kernel version.
  590. func (handle *LinuxKernelHandler) GetKernelVersion() (string, error) {
  591. kernelVersionFile := "/proc/sys/kernel/osrelease"
  592. fileContent, err := ioutil.ReadFile(kernelVersionFile)
  593. if err != nil {
  594. return "", fmt.Errorf("error reading osrelease file %q: %v", kernelVersionFile, err)
  595. }
  596. return strings.TrimSpace(string(fileContent)), nil
  597. }
  598. // CanUseIPVSProxier returns true if we can use the ipvs Proxier.
  599. // This is determined by checking if all the required kernel modules can be loaded. It may
  600. // return an error if it fails to get the kernel modules information without error, in which
  601. // case it will also return false.
  602. func CanUseIPVSProxier(handle KernelHandler, ipsetver IPSetVersioner) (bool, error) {
  603. mods, err := handle.GetModules()
  604. if err != nil {
  605. return false, fmt.Errorf("error getting installed ipvs required kernel modules: %v", err)
  606. }
  607. loadModules := sets.NewString()
  608. loadModules.Insert(mods...)
  609. kernelVersionStr, err := handle.GetKernelVersion()
  610. if err != nil {
  611. return false, fmt.Errorf("error determining kernel version to find required kernel modules for ipvs support: %v", err)
  612. }
  613. kernelVersion, err := version.ParseGeneric(kernelVersionStr)
  614. if err != nil {
  615. return false, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err)
  616. }
  617. mods = utilipvs.GetRequiredIPVSModules(kernelVersion)
  618. wantModules := sets.NewString()
  619. wantModules.Insert(mods...)
  620. modules := wantModules.Difference(loadModules).UnsortedList()
  621. var missingMods []string
  622. ConntrackiMissingCounter := 0
  623. for _, mod := range modules {
  624. if strings.Contains(mod, "nf_conntrack") {
  625. ConntrackiMissingCounter++
  626. } else {
  627. missingMods = append(missingMods, mod)
  628. }
  629. }
  630. if ConntrackiMissingCounter == 2 {
  631. missingMods = append(missingMods, "nf_conntrack_ipv4(or nf_conntrack for Linux kernel 4.19 and later)")
  632. }
  633. if len(missingMods) != 0 {
  634. return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", missingMods)
  635. }
  636. // Check ipset version
  637. versionString, err := ipsetver.GetVersion()
  638. if err != nil {
  639. return false, fmt.Errorf("error getting ipset version, error: %v", err)
  640. }
  641. if !checkMinVersion(versionString) {
  642. return false, fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion)
  643. }
  644. return true, nil
  645. }
  646. // CleanupIptablesLeftovers removes all iptables rules and chains created by the Proxier
  647. // It returns true if an error was encountered. Errors are logged.
  648. func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
  649. // Unlink the iptables chains created by ipvs Proxier
  650. for _, jc := range iptablesJumpChain {
  651. args := []string{
  652. "-m", "comment", "--comment", jc.comment,
  653. "-j", string(jc.to),
  654. }
  655. if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil {
  656. if !utiliptables.IsNotFoundError(err) {
  657. klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
  658. encounteredError = true
  659. }
  660. }
  661. }
  662. // Flush and remove all of our chains. Flushing all chains before removing them also removes all links between chains first.
  663. for _, ch := range iptablesCleanupChains {
  664. if err := ipt.FlushChain(ch.table, ch.chain); err != nil {
  665. if !utiliptables.IsNotFoundError(err) {
  666. klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
  667. encounteredError = true
  668. }
  669. }
  670. }
  671. // Remove all of our chains.
  672. for _, ch := range iptablesCleanupChains {
  673. if err := ipt.DeleteChain(ch.table, ch.chain); err != nil {
  674. if !utiliptables.IsNotFoundError(err) {
  675. klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
  676. encounteredError = true
  677. }
  678. }
  679. }
  680. return encounteredError
  681. }
  682. // CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier.
  683. func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface, cleanupIPVS bool) (encounteredError bool) {
  684. if cleanupIPVS {
  685. // Return immediately when ipvs interface is nil - Probably initialization failed in somewhere.
  686. if ipvs == nil {
  687. return true
  688. }
  689. encounteredError = false
  690. err := ipvs.Flush()
  691. if err != nil {
  692. klog.Errorf("Error flushing IPVS rules: %v", err)
  693. encounteredError = true
  694. }
  695. }
  696. // Delete dummy interface created by ipvs Proxier.
  697. nl := NewNetLinkHandle(false)
  698. err := nl.DeleteDummyDevice(DefaultDummyDevice)
  699. if err != nil {
  700. klog.Errorf("Error deleting dummy device %s created by IPVS proxier: %v", DefaultDummyDevice, err)
  701. encounteredError = true
  702. }
  703. // Clear iptables created by ipvs Proxier.
  704. encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError
  705. // Destroy ip sets created by ipvs Proxier. We should call it after cleaning up
  706. // iptables since we can NOT delete ip set which is still referenced by iptables.
  707. for _, set := range ipsetInfo {
  708. err = ipset.DestroySet(set.name)
  709. if err != nil {
  710. if !utilipset.IsNotFoundError(err) {
  711. klog.Errorf("Error removing ipset %s, error: %v", set.name, err)
  712. encounteredError = true
  713. }
  714. }
  715. }
  716. return encounteredError
  717. }
  718. // Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
  719. func (proxier *Proxier) Sync() {
  720. if proxier.healthzServer != nil {
  721. proxier.healthzServer.QueuedUpdate()
  722. }
  723. proxier.syncRunner.Run()
  724. }
  725. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
  726. func (proxier *Proxier) SyncLoop() {
  727. // Update healthz timestamp at beginning in case Sync() never succeeds.
  728. if proxier.healthzServer != nil {
  729. proxier.healthzServer.Updated()
  730. }
  731. proxier.syncRunner.Loop(wait.NeverStop)
  732. }
  733. func (proxier *Proxier) setInitialized(value bool) {
  734. var initialized int32
  735. if value {
  736. initialized = 1
  737. }
  738. atomic.StoreInt32(&proxier.initialized, initialized)
  739. }
  740. func (proxier *Proxier) isInitialized() bool {
  741. return atomic.LoadInt32(&proxier.initialized) > 0
  742. }
  743. // OnServiceAdd is called whenever creation of new service object is observed.
  744. func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  745. proxier.OnServiceUpdate(nil, service)
  746. }
  747. // OnServiceUpdate is called whenever modification of an existing service object is observed.
  748. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  749. if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
  750. proxier.Sync()
  751. }
  752. }
  753. // OnServiceDelete is called whenever deletion of an existing service object is observed.
  754. func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  755. proxier.OnServiceUpdate(service, nil)
  756. }
  757. // OnServiceSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
  758. func (proxier *Proxier) OnServiceSynced() {
  759. proxier.mu.Lock()
  760. proxier.servicesSynced = true
  761. if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
  762. proxier.setInitialized(proxier.endpointSlicesSynced)
  763. } else {
  764. proxier.setInitialized(proxier.endpointsSynced)
  765. }
  766. proxier.mu.Unlock()
  767. // Sync unconditionally - this is called once per lifetime.
  768. proxier.syncProxyRules()
  769. }
  770. // OnEndpointsAdd is called whenever creation of new endpoints object is observed.
  771. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
  772. proxier.OnEndpointsUpdate(nil, endpoints)
  773. }
  774. // OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
  775. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  776. if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
  777. proxier.Sync()
  778. }
  779. }
  780. // OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
  781. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
  782. proxier.OnEndpointsUpdate(endpoints, nil)
  783. }
  784. // OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
  785. func (proxier *Proxier) OnEndpointsSynced() {
  786. proxier.mu.Lock()
  787. proxier.endpointsSynced = true
  788. proxier.setInitialized(proxier.servicesSynced)
  789. proxier.mu.Unlock()
  790. // Sync unconditionally - this is called once per lifetime.
  791. proxier.syncProxyRules()
  792. }
  793. // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
  794. // is observed.
  795. func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
  796. if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
  797. proxier.Sync()
  798. }
  799. }
  800. // OnEndpointSliceUpdate is called whenever modification of an existing endpoint
  801. // slice object is observed.
  802. func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
  803. if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
  804. proxier.Sync()
  805. }
  806. }
  807. // OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
  808. // object is observed.
  809. func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
  810. if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
  811. proxier.Sync()
  812. }
  813. }
  814. // OnEndpointSlicesSynced is called once all the initial event handlers were
  815. // called and the state is fully propagated to local cache.
  816. func (proxier *Proxier) OnEndpointSlicesSynced() {
  817. proxier.mu.Lock()
  818. proxier.endpointSlicesSynced = true
  819. proxier.setInitialized(proxier.servicesSynced)
  820. proxier.mu.Unlock()
  821. // Sync unconditionally - this is called once per lifetime.
  822. proxier.syncProxyRules()
  823. }
  824. // OnNodeAdd is called whenever creation of new node object
  825. // is observed.
  826. func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
  827. if node.Name != proxier.hostname {
  828. klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
  829. return
  830. }
  831. if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
  832. return
  833. }
  834. proxier.mu.Lock()
  835. proxier.nodeLabels = node.Labels
  836. proxier.mu.Unlock()
  837. proxier.syncProxyRules()
  838. }
  839. // OnNodeUpdate is called whenever modification of an existing
  840. // node object is observed.
  841. func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
  842. if node.Name != proxier.hostname {
  843. klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
  844. return
  845. }
  846. if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
  847. return
  848. }
  849. proxier.mu.Lock()
  850. proxier.nodeLabels = node.Labels
  851. proxier.mu.Unlock()
  852. proxier.syncProxyRules()
  853. }
  854. // OnNodeDelete is called whever deletion of an existing node
  855. // object is observed.
  856. func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
  857. if node.Name != proxier.hostname {
  858. klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
  859. return
  860. }
  861. proxier.mu.Lock()
  862. proxier.nodeLabels = nil
  863. proxier.mu.Unlock()
  864. proxier.syncProxyRules()
  865. }
  866. // OnNodeSynced is called once all the initial event handlers were
  867. // called and the state is fully propagated to local cache.
  868. func (proxier *Proxier) OnNodeSynced() {
  869. }
  870. // EntryInvalidErr indicates if an ipset entry is invalid or not
  871. const EntryInvalidErr = "error adding entry %s to ipset %s"
  872. // This is where all of the ipvs calls happen.
  873. // assumes proxier.mu is held
  874. func (proxier *Proxier) syncProxyRules() {
  875. proxier.mu.Lock()
  876. defer proxier.mu.Unlock()
  877. // don't sync rules till we've received services and endpoints
  878. if !proxier.isInitialized() {
  879. klog.V(2).Info("Not syncing ipvs rules until Services and Endpoints have been received from master")
  880. return
  881. }
  882. // Keep track of how long syncs take.
  883. start := time.Now()
  884. defer func() {
  885. metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
  886. klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
  887. }()
  888. localAddrs, err := utilproxy.GetLocalAddrs()
  889. if err != nil {
  890. klog.Errorf("Failed to get local addresses during proxy sync: %v, assuming external IPs are not local", err)
  891. } else if len(localAddrs) == 0 {
  892. klog.Warning("No local addresses found, assuming all external IPs are not local")
  893. }
  894. localAddrSet := utilnet.IPSet{}
  895. localAddrSet.Insert(localAddrs...)
  896. // We assume that if this was called, we really want to sync them,
  897. // even if nothing changed in the meantime. In other words, callers are
  898. // responsible for detecting no-op changes and not calling this function.
  899. serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
  900. endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
  901. staleServices := serviceUpdateResult.UDPStaleClusterIP
  902. // merge stale services gathered from updateEndpointsMap
  903. for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
  904. if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
  905. klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String())
  906. staleServices.Insert(svcInfo.ClusterIP().String())
  907. for _, extIP := range svcInfo.ExternalIPStrings() {
  908. staleServices.Insert(extIP)
  909. }
  910. }
  911. }
  912. klog.V(3).Infof("Syncing ipvs Proxier rules")
  913. // Begin install iptables
  914. // Reset all buffers used later.
  915. // This is to avoid memory reallocations and thus improve performance.
  916. proxier.natChains.Reset()
  917. proxier.natRules.Reset()
  918. proxier.filterChains.Reset()
  919. proxier.filterRules.Reset()
  920. // Write table headers.
  921. writeLine(proxier.filterChains, "*filter")
  922. writeLine(proxier.natChains, "*nat")
  923. proxier.createAndLinkeKubeChain()
  924. // make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
  925. _, err = proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  926. if err != nil {
  927. klog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
  928. return
  929. }
  930. // make sure ip sets exists in the system.
  931. for _, set := range proxier.ipsetList {
  932. if err := ensureIPSet(set); err != nil {
  933. return
  934. }
  935. set.resetEntries()
  936. }
  937. // Accumulate the set of local ports that we will be holding open once this update is complete
  938. replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
  939. // activeIPVSServices represents IPVS service successfully created in this round of sync
  940. activeIPVSServices := map[string]bool{}
  941. // currentIPVSServices represent IPVS services listed from the system
  942. currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
  943. // activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync
  944. activeBindAddrs := map[string]bool{}
  945. hasNodePort := false
  946. for _, svc := range proxier.serviceMap {
  947. svcInfo, ok := svc.(*serviceInfo)
  948. if ok && svcInfo.NodePort() != 0 {
  949. hasNodePort = true
  950. break
  951. }
  952. }
  953. // Both nodeAddresses and nodeIPs can be reused for all nodePort services
  954. // and only need to be computed if we have at least one nodePort service.
  955. var (
  956. // List of node addresses to listen on if a nodePort is set.
  957. nodeAddresses []string
  958. // List of node IP addresses to be used as IPVS services if nodePort is set.
  959. nodeIPs []net.IP
  960. )
  961. if hasNodePort {
  962. nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
  963. if err != nil {
  964. klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
  965. }
  966. if err == nil && nodeAddrSet.Len() > 0 {
  967. nodeAddresses = nodeAddrSet.List()
  968. for _, address := range nodeAddresses {
  969. if utilproxy.IsZeroCIDR(address) {
  970. nodeIPs, err = proxier.ipGetter.NodeIPs()
  971. if err != nil {
  972. klog.Errorf("Failed to list all node IPs from host, err: %v", err)
  973. }
  974. break
  975. }
  976. nodeIPs = append(nodeIPs, net.ParseIP(address))
  977. }
  978. }
  979. }
  980. // Build IPVS rules for each service.
  981. for svcName, svc := range proxier.serviceMap {
  982. svcInfo, ok := svc.(*serviceInfo)
  983. if !ok {
  984. klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
  985. continue
  986. }
  987. isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
  988. protocol := strings.ToLower(string(svcInfo.Protocol()))
  989. // Precompute svcNameString; with many services the many calls
  990. // to ServicePortName.String() show up in CPU profiles.
  991. svcNameString := svcName.String()
  992. // Handle traffic that loops back to the originator with SNAT.
  993. for _, e := range proxier.endpointsMap[svcName] {
  994. ep, ok := e.(*proxy.BaseEndpointInfo)
  995. if !ok {
  996. klog.Errorf("Failed to cast BaseEndpointInfo %q", e.String())
  997. continue
  998. }
  999. if !ep.IsLocal {
  1000. continue
  1001. }
  1002. epIP := ep.IP()
  1003. epPort, err := ep.Port()
  1004. // Error parsing this endpoint has been logged. Skip to next endpoint.
  1005. if epIP == "" || err != nil {
  1006. continue
  1007. }
  1008. entry := &utilipset.Entry{
  1009. IP: epIP,
  1010. Port: epPort,
  1011. Protocol: protocol,
  1012. IP2: epIP,
  1013. SetType: utilipset.HashIPPortIP,
  1014. }
  1015. if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
  1016. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name))
  1017. continue
  1018. }
  1019. proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
  1020. }
  1021. // Capture the clusterIP.
  1022. // ipset call
  1023. entry := &utilipset.Entry{
  1024. IP: svcInfo.ClusterIP().String(),
  1025. Port: svcInfo.Port(),
  1026. Protocol: protocol,
  1027. SetType: utilipset.HashIPPort,
  1028. }
  1029. // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
  1030. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
  1031. if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
  1032. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name))
  1033. continue
  1034. }
  1035. proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
  1036. // ipvs call
  1037. serv := &utilipvs.VirtualServer{
  1038. Address: svcInfo.ClusterIP(),
  1039. Port: uint16(svcInfo.Port()),
  1040. Protocol: string(svcInfo.Protocol()),
  1041. Scheduler: proxier.ipvsScheduler,
  1042. }
  1043. // Set session affinity flag and timeout for IPVS service
  1044. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1045. serv.Flags |= utilipvs.FlagPersistent
  1046. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1047. }
  1048. // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
  1049. if err := proxier.syncService(svcNameString, serv, true); err == nil {
  1050. activeIPVSServices[serv.String()] = true
  1051. activeBindAddrs[serv.Address.String()] = true
  1052. // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
  1053. // So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
  1054. if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
  1055. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  1056. }
  1057. } else {
  1058. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  1059. }
  1060. // Capture externalIPs.
  1061. for _, externalIP := range svcInfo.ExternalIPStrings() {
  1062. // If the "external" IP happens to be an IP that is local to this
  1063. // machine, hold the local port open so no other process can open it
  1064. // (because the socket might open but it would never work).
  1065. if localAddrSet.Len() > 0 && (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) {
  1066. // We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP
  1067. lp := utilproxy.LocalPort{
  1068. Description: "externalIP for " + svcNameString,
  1069. IP: externalIP,
  1070. Port: svcInfo.Port(),
  1071. Protocol: protocol,
  1072. }
  1073. if proxier.portsMap[lp] != nil {
  1074. klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
  1075. replacementPortsMap[lp] = proxier.portsMap[lp]
  1076. } else {
  1077. socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
  1078. if err != nil {
  1079. msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
  1080. proxier.recorder.Eventf(
  1081. &v1.ObjectReference{
  1082. Kind: "Node",
  1083. Name: proxier.hostname,
  1084. UID: types.UID(proxier.hostname),
  1085. Namespace: "",
  1086. }, v1.EventTypeWarning, err.Error(), msg)
  1087. klog.Error(msg)
  1088. continue
  1089. }
  1090. replacementPortsMap[lp] = socket
  1091. }
  1092. } // We're holding the port, so it's OK to install IPVS rules.
  1093. // ipset call
  1094. entry := &utilipset.Entry{
  1095. IP: externalIP,
  1096. Port: svcInfo.Port(),
  1097. Protocol: protocol,
  1098. SetType: utilipset.HashIPPort,
  1099. }
  1100. // We have to SNAT packets to external IPs.
  1101. if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
  1102. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name))
  1103. continue
  1104. }
  1105. proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
  1106. // ipvs call
  1107. serv := &utilipvs.VirtualServer{
  1108. Address: net.ParseIP(externalIP),
  1109. Port: uint16(svcInfo.Port()),
  1110. Protocol: string(svcInfo.Protocol()),
  1111. Scheduler: proxier.ipvsScheduler,
  1112. }
  1113. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1114. serv.Flags |= utilipvs.FlagPersistent
  1115. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1116. }
  1117. if err := proxier.syncService(svcNameString, serv, true); err == nil {
  1118. activeIPVSServices[serv.String()] = true
  1119. activeBindAddrs[serv.Address.String()] = true
  1120. if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
  1121. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  1122. }
  1123. } else {
  1124. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  1125. }
  1126. }
  1127. // Capture load-balancer ingress.
  1128. for _, ingress := range svcInfo.LoadBalancerIPStrings() {
  1129. if ingress != "" {
  1130. // ipset call
  1131. entry = &utilipset.Entry{
  1132. IP: ingress,
  1133. Port: svcInfo.Port(),
  1134. Protocol: protocol,
  1135. SetType: utilipset.HashIPPort,
  1136. }
  1137. // add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
  1138. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
  1139. // If we are proxying globally, we need to masquerade in case we cross nodes.
  1140. // If we are proxying only locally, we can retain the source IP.
  1141. if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
  1142. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSet].Name))
  1143. continue
  1144. }
  1145. proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
  1146. // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
  1147. if svcInfo.OnlyNodeLocalEndpoints() {
  1148. if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
  1149. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name))
  1150. continue
  1151. }
  1152. proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
  1153. }
  1154. if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
  1155. // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
  1156. // This currently works for loadbalancers that preserves source ips.
  1157. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
  1158. if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
  1159. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadbalancerFWSet].Name))
  1160. continue
  1161. }
  1162. proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
  1163. allowFromNode := false
  1164. for _, src := range svcInfo.LoadBalancerSourceRanges() {
  1165. // ipset call
  1166. entry = &utilipset.Entry{
  1167. IP: ingress,
  1168. Port: svcInfo.Port(),
  1169. Protocol: protocol,
  1170. Net: src,
  1171. SetType: utilipset.HashIPPortNet,
  1172. }
  1173. // enumerate all white list source cidr
  1174. if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
  1175. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name))
  1176. continue
  1177. }
  1178. proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
  1179. // ignore error because it has been validated
  1180. _, cidr, _ := net.ParseCIDR(src)
  1181. if cidr.Contains(proxier.nodeIP) {
  1182. allowFromNode = true
  1183. }
  1184. }
  1185. // generally, ip route rule was added to intercept request to loadbalancer vip from the
  1186. // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
  1187. // Need to add the following rule to allow request on host.
  1188. if allowFromNode {
  1189. entry = &utilipset.Entry{
  1190. IP: ingress,
  1191. Port: svcInfo.Port(),
  1192. Protocol: protocol,
  1193. IP2: ingress,
  1194. SetType: utilipset.HashIPPortIP,
  1195. }
  1196. // enumerate all white list source ip
  1197. if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
  1198. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name))
  1199. continue
  1200. }
  1201. proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
  1202. }
  1203. }
  1204. // ipvs call
  1205. serv := &utilipvs.VirtualServer{
  1206. Address: net.ParseIP(ingress),
  1207. Port: uint16(svcInfo.Port()),
  1208. Protocol: string(svcInfo.Protocol()),
  1209. Scheduler: proxier.ipvsScheduler,
  1210. }
  1211. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1212. serv.Flags |= utilipvs.FlagPersistent
  1213. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1214. }
  1215. if err := proxier.syncService(svcNameString, serv, true); err == nil {
  1216. activeIPVSServices[serv.String()] = true
  1217. activeBindAddrs[serv.Address.String()] = true
  1218. if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
  1219. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  1220. }
  1221. } else {
  1222. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  1223. }
  1224. }
  1225. }
  1226. if svcInfo.NodePort() != 0 {
  1227. if len(nodeAddresses) == 0 || len(nodeIPs) == 0 {
  1228. // Skip nodePort configuration since an error occurred when
  1229. // computing nodeAddresses or nodeIPs.
  1230. continue
  1231. }
  1232. var lps []utilproxy.LocalPort
  1233. for _, address := range nodeAddresses {
  1234. lp := utilproxy.LocalPort{
  1235. Description: "nodePort for " + svcNameString,
  1236. IP: address,
  1237. Port: svcInfo.NodePort(),
  1238. Protocol: protocol,
  1239. }
  1240. if utilproxy.IsZeroCIDR(address) {
  1241. // Empty IP address means all
  1242. lp.IP = ""
  1243. lps = append(lps, lp)
  1244. // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
  1245. break
  1246. }
  1247. lps = append(lps, lp)
  1248. }
  1249. // For ports on node IPs, open the actual port and hold it.
  1250. for _, lp := range lps {
  1251. if proxier.portsMap[lp] != nil {
  1252. klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
  1253. replacementPortsMap[lp] = proxier.portsMap[lp]
  1254. // We do not start listening on SCTP ports, according to our agreement in the
  1255. // SCTP support KEP
  1256. } else if svcInfo.Protocol() != v1.ProtocolSCTP {
  1257. socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
  1258. if err != nil {
  1259. klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
  1260. continue
  1261. }
  1262. if lp.Protocol == "udp" {
  1263. conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
  1264. }
  1265. replacementPortsMap[lp] = socket
  1266. } // We're holding the port, so it's OK to install ipvs rules.
  1267. }
  1268. // Nodeports need SNAT, unless they're local.
  1269. // ipset call
  1270. var (
  1271. nodePortSet *IPSet
  1272. entries []*utilipset.Entry
  1273. )
  1274. switch protocol {
  1275. case "tcp":
  1276. nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
  1277. entries = []*utilipset.Entry{{
  1278. // No need to provide ip info
  1279. Port: svcInfo.NodePort(),
  1280. Protocol: protocol,
  1281. SetType: utilipset.BitmapPort,
  1282. }}
  1283. case "udp":
  1284. nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
  1285. entries = []*utilipset.Entry{{
  1286. // No need to provide ip info
  1287. Port: svcInfo.NodePort(),
  1288. Protocol: protocol,
  1289. SetType: utilipset.BitmapPort,
  1290. }}
  1291. case "sctp":
  1292. nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
  1293. // Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
  1294. entries = []*utilipset.Entry{}
  1295. for _, nodeIP := range nodeIPs {
  1296. entries = append(entries, &utilipset.Entry{
  1297. IP: nodeIP.String(),
  1298. Port: svcInfo.NodePort(),
  1299. Protocol: protocol,
  1300. SetType: utilipset.HashIPPort,
  1301. })
  1302. }
  1303. default:
  1304. // It should never hit
  1305. klog.Errorf("Unsupported protocol type: %s", protocol)
  1306. }
  1307. if nodePortSet != nil {
  1308. entryInvalidErr := false
  1309. for _, entry := range entries {
  1310. if valid := nodePortSet.validateEntry(entry); !valid {
  1311. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
  1312. entryInvalidErr = true
  1313. break
  1314. }
  1315. nodePortSet.activeEntries.Insert(entry.String())
  1316. }
  1317. if entryInvalidErr {
  1318. continue
  1319. }
  1320. }
  1321. // Add externaltrafficpolicy=local type nodeport entry
  1322. if svcInfo.OnlyNodeLocalEndpoints() {
  1323. var nodePortLocalSet *IPSet
  1324. switch protocol {
  1325. case "tcp":
  1326. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
  1327. case "udp":
  1328. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
  1329. case "sctp":
  1330. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
  1331. default:
  1332. // It should never hit
  1333. klog.Errorf("Unsupported protocol type: %s", protocol)
  1334. }
  1335. if nodePortLocalSet != nil {
  1336. entryInvalidErr := false
  1337. for _, entry := range entries {
  1338. if valid := nodePortLocalSet.validateEntry(entry); !valid {
  1339. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
  1340. entryInvalidErr = true
  1341. break
  1342. }
  1343. nodePortLocalSet.activeEntries.Insert(entry.String())
  1344. }
  1345. if entryInvalidErr {
  1346. continue
  1347. }
  1348. }
  1349. }
  1350. // Build ipvs kernel routes for each node ip address
  1351. for _, nodeIP := range nodeIPs {
  1352. // ipvs call
  1353. serv := &utilipvs.VirtualServer{
  1354. Address: nodeIP,
  1355. Port: uint16(svcInfo.NodePort()),
  1356. Protocol: string(svcInfo.Protocol()),
  1357. Scheduler: proxier.ipvsScheduler,
  1358. }
  1359. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1360. serv.Flags |= utilipvs.FlagPersistent
  1361. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1362. }
  1363. // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
  1364. if err := proxier.syncService(svcNameString, serv, false); err == nil {
  1365. activeIPVSServices[serv.String()] = true
  1366. if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
  1367. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  1368. }
  1369. } else {
  1370. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  1371. }
  1372. }
  1373. }
  1374. }
  1375. // sync ipset entries
  1376. for _, set := range proxier.ipsetList {
  1377. set.syncIPSetEntries()
  1378. }
  1379. // Tail call iptables rules for ipset, make sure only call iptables once
  1380. // in a single loop per ip set.
  1381. proxier.writeIptablesRules()
  1382. // Sync iptables rules.
  1383. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
  1384. proxier.iptablesData.Reset()
  1385. proxier.iptablesData.Write(proxier.natChains.Bytes())
  1386. proxier.iptablesData.Write(proxier.natRules.Bytes())
  1387. proxier.iptablesData.Write(proxier.filterChains.Bytes())
  1388. proxier.iptablesData.Write(proxier.filterRules.Bytes())
  1389. klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
  1390. err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
  1391. if err != nil {
  1392. klog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
  1393. metrics.IptablesRestoreFailuresTotal.Inc()
  1394. // Revert new local ports.
  1395. utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
  1396. return
  1397. }
  1398. for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
  1399. for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
  1400. latency := metrics.SinceInSeconds(lastChangeTriggerTime)
  1401. metrics.NetworkProgrammingLatency.Observe(latency)
  1402. klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
  1403. }
  1404. }
  1405. // Close old local ports and save new ones.
  1406. for k, v := range proxier.portsMap {
  1407. if replacementPortsMap[k] == nil {
  1408. v.Close()
  1409. }
  1410. }
  1411. proxier.portsMap = replacementPortsMap
  1412. // Get legacy bind address
  1413. // currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
  1414. currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  1415. if err != nil {
  1416. klog.Errorf("Failed to get bind address, err: %v", err)
  1417. }
  1418. legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
  1419. // Clean up legacy IPVS services and unbind addresses
  1420. appliedSvcs, err := proxier.ipvs.GetVirtualServers()
  1421. if err == nil {
  1422. for _, appliedSvc := range appliedSvcs {
  1423. currentIPVSServices[appliedSvc.String()] = appliedSvc
  1424. }
  1425. } else {
  1426. klog.Errorf("Failed to get ipvs service, err: %v", err)
  1427. }
  1428. proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
  1429. if proxier.healthzServer != nil {
  1430. proxier.healthzServer.Updated()
  1431. }
  1432. metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
  1433. // Update service healthchecks. The endpoints list might include services that are
  1434. // not "OnlyLocal", but the services list will not, and the serviceHealthServer
  1435. // will just drop those endpoints.
  1436. if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
  1437. klog.Errorf("Error syncing healthcheck services: %v", err)
  1438. }
  1439. if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
  1440. klog.Errorf("Error syncing healthcheck endpoints: %v", err)
  1441. }
  1442. // Finish housekeeping.
  1443. // TODO: these could be made more consistent.
  1444. for _, svcIP := range staleServices.UnsortedList() {
  1445. if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
  1446. klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
  1447. }
  1448. }
  1449. proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
  1450. }
  1451. // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
  1452. // according to proxier.ipsetList information and the ipset match relationship that `ipsetWithIptablesChain` specified.
  1453. // some ipset(kubeClusterIPSet for example) have particular match rules and iptables jump relation should be sync separately.
  1454. func (proxier *Proxier) writeIptablesRules() {
  1455. // We are creating those slices ones here to avoid memory reallocations
  1456. // in every loop. Note that reuse the memory, instead of doing:
  1457. // slice = <some new slice>
  1458. // you should always do one of the below:
  1459. // slice = slice[:0] // and then append to it
  1460. // slice = append(slice[:0], ...)
  1461. // To avoid growing this slice, we arbitrarily set its size to 64,
  1462. // there is never more than that many arguments for a single line.
  1463. // Note that even if we go over 64, it will still be correct - it
  1464. // is just for efficiency, not correctness.
  1465. args := make([]string, 64)
  1466. for _, set := range ipsetWithIptablesChain {
  1467. if _, find := proxier.ipsetList[set.name]; find && !proxier.ipsetList[set.name].isEmpty() {
  1468. args = append(args[:0], "-A", set.from)
  1469. if set.protocolMatch != "" {
  1470. args = append(args, "-p", set.protocolMatch)
  1471. }
  1472. args = append(args,
  1473. "-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(),
  1474. "-m", "set", "--match-set", proxier.ipsetList[set.name].Name,
  1475. set.matchType,
  1476. )
  1477. writeLine(proxier.natRules, append(args, "-j", set.to)...)
  1478. }
  1479. }
  1480. if !proxier.ipsetList[kubeClusterIPSet].isEmpty() {
  1481. args = append(args[:0],
  1482. "-A", string(kubeServicesChain),
  1483. "-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(),
  1484. "-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name,
  1485. )
  1486. if proxier.masqueradeAll {
  1487. writeLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...)
  1488. } else if proxier.localDetector.IsImplemented() {
  1489. // This masquerades off-cluster traffic to a service VIP. The idea
  1490. // is that you can establish a static route for your Service range,
  1491. // routing to any node, and that node will bridge into the Service
  1492. // for you. Since that might bounce off-node, we masquerade here.
  1493. // If/when we support "Local" policy for VIPs, we should update this.
  1494. writeLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))...)
  1495. } else {
  1496. // Masquerade all OUTPUT traffic coming from a service ip.
  1497. // The kube dummy interface has all service VIPs assigned which
  1498. // results in the service VIP being picked as the source IP to reach
  1499. // a VIP. This leads to a connection from VIP:<random port> to
  1500. // VIP:<service port>.
  1501. // Always masquerading OUTPUT (node-originating) traffic with a VIP
  1502. // source ip and service port destination fixes the outgoing connections.
  1503. writeLine(proxier.natRules, append(args, "src,dst", "-j", string(KubeMarkMasqChain))...)
  1504. }
  1505. }
  1506. if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
  1507. // Build masquerade rules for packets to external IPs.
  1508. args = append(args[:0],
  1509. "-A", string(kubeServicesChain),
  1510. "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
  1511. "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
  1512. "dst,dst",
  1513. )
  1514. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
  1515. // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
  1516. // nor from a local process to be forwarded to the service.
  1517. // This rule roughly translates to "all traffic from off-machine".
  1518. // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
  1519. externalTrafficOnlyArgs := append(args,
  1520. "-m", "physdev", "!", "--physdev-is-in",
  1521. "-m", "addrtype", "!", "--src-type", "LOCAL")
  1522. writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...)
  1523. dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
  1524. // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
  1525. // This covers cases like GCE load-balancers which get added to the local routing table.
  1526. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
  1527. }
  1528. // -A KUBE-SERVICES -m addrtype --dst-type LOCAL -j KUBE-NODE-PORT
  1529. args = append(args[:0],
  1530. "-A", string(kubeServicesChain),
  1531. "-m", "addrtype", "--dst-type", "LOCAL",
  1532. )
  1533. writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...)
  1534. // mark drop for KUBE-LOAD-BALANCER
  1535. writeLine(proxier.natRules, []string{
  1536. "-A", string(KubeLoadBalancerChain),
  1537. "-j", string(KubeMarkMasqChain),
  1538. }...)
  1539. // mark drop for KUBE-FIRE-WALL
  1540. writeLine(proxier.natRules, []string{
  1541. "-A", string(KubeFireWallChain),
  1542. "-j", string(KubeMarkDropChain),
  1543. }...)
  1544. // Accept all traffic with destination of ipvs virtual service, in case other iptables rules
  1545. // block the traffic, that may result in ipvs rules invalid.
  1546. // Those rules must be in the end of KUBE-SERVICE chain
  1547. proxier.acceptIPVSTraffic()
  1548. // If the masqueradeMark has been added then we want to forward that same
  1549. // traffic, this allows NodePort traffic to be forwarded even if the default
  1550. // FORWARD policy is not accept.
  1551. writeLine(proxier.filterRules,
  1552. "-A", string(KubeForwardChain),
  1553. "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
  1554. "-m", "mark", "--mark", proxier.masqueradeMark,
  1555. "-j", "ACCEPT",
  1556. )
  1557. // The following two rules ensure the traffic after the initial packet
  1558. // accepted by the "kubernetes forwarding rules" rule above will be
  1559. // accepted.
  1560. writeLine(proxier.filterRules,
  1561. "-A", string(KubeForwardChain),
  1562. "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
  1563. "-m", "conntrack",
  1564. "--ctstate", "RELATED,ESTABLISHED",
  1565. "-j", "ACCEPT",
  1566. )
  1567. writeLine(proxier.filterRules,
  1568. "-A", string(KubeForwardChain),
  1569. "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
  1570. "-m", "conntrack",
  1571. "--ctstate", "RELATED,ESTABLISHED",
  1572. "-j", "ACCEPT",
  1573. )
  1574. // Write the end-of-table markers.
  1575. writeLine(proxier.filterRules, "COMMIT")
  1576. writeLine(proxier.natRules, "COMMIT")
  1577. }
  1578. func (proxier *Proxier) acceptIPVSTraffic() {
  1579. sets := []string{kubeClusterIPSet, kubeLoadBalancerSet}
  1580. for _, set := range sets {
  1581. var matchType string
  1582. if !proxier.ipsetList[set].isEmpty() {
  1583. switch proxier.ipsetList[set].SetType {
  1584. case utilipset.BitmapPort:
  1585. matchType = "dst"
  1586. default:
  1587. matchType = "dst,dst"
  1588. }
  1589. writeLine(proxier.natRules, []string{
  1590. "-A", string(kubeServicesChain),
  1591. "-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType,
  1592. "-j", "ACCEPT",
  1593. }...)
  1594. }
  1595. }
  1596. }
  1597. // createAndLinkeKubeChain create all kube chains that ipvs proxier need and write basic link.
  1598. func (proxier *Proxier) createAndLinkeKubeChain() {
  1599. existingFilterChains := proxier.getExistingChains(proxier.filterChainsData, utiliptables.TableFilter)
  1600. existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT)
  1601. // Make sure we keep stats for the top-level chains
  1602. for _, ch := range iptablesChains {
  1603. if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
  1604. klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err)
  1605. return
  1606. }
  1607. if ch.table == utiliptables.TableNAT {
  1608. if chain, ok := existingNATChains[ch.chain]; ok {
  1609. writeBytesLine(proxier.natChains, chain)
  1610. } else {
  1611. writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
  1612. }
  1613. } else {
  1614. if chain, ok := existingFilterChains[KubeForwardChain]; ok {
  1615. writeBytesLine(proxier.filterChains, chain)
  1616. } else {
  1617. writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain))
  1618. }
  1619. }
  1620. }
  1621. for _, jc := range iptablesJumpChain {
  1622. args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
  1623. if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
  1624. klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jc.table, jc.from, jc.to, err)
  1625. }
  1626. }
  1627. // Install the kubernetes-specific postrouting rules. We use a whole chain for
  1628. // this so that it is easier to flush and change, for example if the mark
  1629. // value should ever change.
  1630. // NB: THIS MUST MATCH the corresponding code in the kubelet
  1631. masqRule := []string{
  1632. "-A", string(kubePostroutingChain),
  1633. "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
  1634. "-m", "mark", "--mark", proxier.masqueradeMark,
  1635. "-j", "MASQUERADE",
  1636. }
  1637. if proxier.iptables.HasRandomFully() {
  1638. masqRule = append(masqRule, "--random-fully")
  1639. klog.V(3).Info("Using `--random-fully` in the MASQUERADE rule for iptables")
  1640. } else {
  1641. klog.V(2).Info("Not using `--random-fully` in the MASQUERADE rule for iptables because the local version of iptables does not support it")
  1642. }
  1643. writeLine(proxier.natRules, masqRule...)
  1644. // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
  1645. // this so that it is easier to flush and change, for example if the mark
  1646. // value should ever change.
  1647. writeLine(proxier.natRules, []string{
  1648. "-A", string(KubeMarkMasqChain),
  1649. "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
  1650. }...)
  1651. }
  1652. // getExistingChains get iptables-save output so we can check for existing chains and rules.
  1653. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
  1654. // Result may SHARE memory with contents of buffer.
  1655. func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptables.Table) map[utiliptables.Chain][]byte {
  1656. buffer.Reset()
  1657. err := proxier.iptables.SaveInto(table, buffer)
  1658. if err != nil { // if we failed to get any rules
  1659. klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
  1660. } else { // otherwise parse the output
  1661. return utiliptables.GetChainLines(table, buffer.Bytes())
  1662. }
  1663. return nil
  1664. }
  1665. // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
  1666. // risk sending more traffic to it, all of which will be lost (because UDP).
  1667. // This assumes the proxier mutex is held
  1668. func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
  1669. for _, epSvcPair := range connectionMap {
  1670. if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
  1671. endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
  1672. err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
  1673. if err != nil {
  1674. klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
  1675. }
  1676. for _, extIP := range svcInfo.ExternalIPStrings() {
  1677. err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
  1678. if err != nil {
  1679. klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err)
  1680. }
  1681. }
  1682. for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
  1683. err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
  1684. if err != nil {
  1685. klog.Errorf("Failed to delete %s endpoint connections for LoadBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err)
  1686. }
  1687. }
  1688. }
  1689. }
  1690. }
  1691. func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
  1692. appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
  1693. if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
  1694. if appliedVirtualServer == nil {
  1695. // IPVS service is not found, create a new service
  1696. klog.V(3).Infof("Adding new service %q %s:%d/%s", svcName, vs.Address, vs.Port, vs.Protocol)
  1697. if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
  1698. klog.Errorf("Failed to add IPVS service %q: %v", svcName, err)
  1699. return err
  1700. }
  1701. } else {
  1702. // IPVS service was changed, update the existing one
  1703. // During updates, service VIP will not go down
  1704. klog.V(3).Infof("IPVS service %s was changed", svcName)
  1705. if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
  1706. klog.Errorf("Failed to update IPVS service, err:%v", err)
  1707. return err
  1708. }
  1709. }
  1710. }
  1711. // bind service address to dummy interface even if service not changed,
  1712. // in case that service IP was removed by other processes
  1713. if bindAddr {
  1714. klog.V(4).Infof("Bind addr %s", vs.Address.String())
  1715. _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
  1716. if err != nil {
  1717. klog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err)
  1718. return err
  1719. }
  1720. }
  1721. return nil
  1722. }
  1723. func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
  1724. appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
  1725. if err != nil || appliedVirtualServer == nil {
  1726. klog.Errorf("Failed to get IPVS service, error: %v", err)
  1727. return err
  1728. }
  1729. // curEndpoints represents IPVS destinations listed from current system.
  1730. curEndpoints := sets.NewString()
  1731. // newEndpoints represents Endpoints watched from API Server.
  1732. newEndpoints := sets.NewString()
  1733. curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
  1734. if err != nil {
  1735. klog.Errorf("Failed to list IPVS destinations, error: %v", err)
  1736. return err
  1737. }
  1738. for _, des := range curDests {
  1739. curEndpoints.Insert(des.String())
  1740. }
  1741. endpoints := proxier.endpointsMap[svcPortName]
  1742. // Service Topology will not be enabled in the following cases:
  1743. // 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
  1744. // 2. ServiceTopology is not enabled.
  1745. // 3. EndpointSlice is not enabled (service topology depends on endpoint slice
  1746. // to get topology information).
  1747. if !onlyNodeLocalEndpoints && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
  1748. endpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, proxier.serviceMap[svcPortName].TopologyKeys(), endpoints)
  1749. }
  1750. for _, epInfo := range endpoints {
  1751. if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() {
  1752. continue
  1753. }
  1754. newEndpoints.Insert(epInfo.String())
  1755. }
  1756. // Create new endpoints
  1757. for _, ep := range newEndpoints.List() {
  1758. ip, port, err := net.SplitHostPort(ep)
  1759. if err != nil {
  1760. klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
  1761. continue
  1762. }
  1763. portNum, err := strconv.Atoi(port)
  1764. if err != nil {
  1765. klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
  1766. continue
  1767. }
  1768. newDest := &utilipvs.RealServer{
  1769. Address: net.ParseIP(ip),
  1770. Port: uint16(portNum),
  1771. Weight: 1,
  1772. }
  1773. if curEndpoints.Has(ep) {
  1774. // check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately
  1775. uniqueRS := GetUniqueRSName(vs, newDest)
  1776. if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
  1777. continue
  1778. }
  1779. klog.V(5).Infof("new ep %q is in graceful delete list", uniqueRS)
  1780. err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
  1781. if err != nil {
  1782. klog.Errorf("Failed to delete endpoint: %v in gracefulDeleteQueue, error: %v", ep, err)
  1783. continue
  1784. }
  1785. }
  1786. err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
  1787. if err != nil {
  1788. klog.Errorf("Failed to add destination: %v, error: %v", newDest, err)
  1789. continue
  1790. }
  1791. }
  1792. // Delete old endpoints
  1793. for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
  1794. // if curEndpoint is in gracefulDelete, skip
  1795. uniqueRS := vs.String() + "/" + ep
  1796. if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
  1797. continue
  1798. }
  1799. ip, port, err := net.SplitHostPort(ep)
  1800. if err != nil {
  1801. klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
  1802. continue
  1803. }
  1804. portNum, err := strconv.Atoi(port)
  1805. if err != nil {
  1806. klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
  1807. continue
  1808. }
  1809. delDest := &utilipvs.RealServer{
  1810. Address: net.ParseIP(ip),
  1811. Port: uint16(portNum),
  1812. }
  1813. klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
  1814. err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
  1815. if err != nil {
  1816. klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
  1817. continue
  1818. }
  1819. }
  1820. return nil
  1821. }
  1822. func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) {
  1823. isIPv6 := utilnet.IsIPv6(proxier.nodeIP)
  1824. for cs := range currentServices {
  1825. svc := currentServices[cs]
  1826. if proxier.isIPInExcludeCIDRs(svc.Address) {
  1827. continue
  1828. }
  1829. if utilnet.IsIPv6(svc.Address) != isIPv6 {
  1830. // Not our family
  1831. continue
  1832. }
  1833. if _, ok := activeServices[cs]; !ok {
  1834. klog.V(4).Infof("Delete service %s", svc.String())
  1835. if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
  1836. klog.Errorf("Failed to delete service %s, error: %v", svc.String(), err)
  1837. }
  1838. addr := svc.Address.String()
  1839. if _, ok := legacyBindAddrs[addr]; ok {
  1840. klog.V(4).Infof("Unbinding address %s", addr)
  1841. if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil {
  1842. klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
  1843. } else {
  1844. // In case we delete a multi-port service, avoid trying to unbind multiple times
  1845. delete(legacyBindAddrs, addr)
  1846. }
  1847. }
  1848. }
  1849. }
  1850. }
  1851. func (proxier *Proxier) isIPInExcludeCIDRs(ip net.IP) bool {
  1852. // make sure it does not fall within an excluded CIDR range.
  1853. for _, excludedCIDR := range proxier.excludeCIDRs {
  1854. if excludedCIDR.Contains(ip) {
  1855. return true
  1856. }
  1857. }
  1858. return false
  1859. }
  1860. func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) map[string]bool {
  1861. legacyAddrs := make(map[string]bool)
  1862. isIpv6 := utilnet.IsIPv6(proxier.nodeIP)
  1863. for _, addr := range currentBindAddrs {
  1864. addrIsIpv6 := utilnet.IsIPv6(net.ParseIP(addr))
  1865. if addrIsIpv6 && !isIpv6 || !addrIsIpv6 && isIpv6 {
  1866. continue
  1867. }
  1868. if _, ok := activeBindAddrs[addr]; !ok {
  1869. legacyAddrs[addr] = true
  1870. }
  1871. }
  1872. return legacyAddrs
  1873. }
  1874. // Join all words with spaces, terminate with newline and write to buff.
  1875. func writeLine(buf *bytes.Buffer, words ...string) {
  1876. // We avoid strings.Join for performance reasons.
  1877. for i := range words {
  1878. buf.WriteString(words[i])
  1879. if i < len(words)-1 {
  1880. buf.WriteByte(' ')
  1881. } else {
  1882. buf.WriteByte('\n')
  1883. }
  1884. }
  1885. }
  1886. func writeBytesLine(buf *bytes.Buffer, bytes []byte) {
  1887. buf.Write(bytes)
  1888. buf.WriteByte('\n')
  1889. }
  1890. // listenPortOpener opens ports by calling bind() and listen().
  1891. type listenPortOpener struct{}
  1892. // OpenLocalPort holds the given local port open.
  1893. func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
  1894. return openLocalPort(lp, isIPv6)
  1895. }
  1896. func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
  1897. // For ports on node IPs, open the actual port and hold it, even though we
  1898. // use ipvs to redirect traffic.
  1899. // This ensures a) that it's safe to use that port and b) that (a) stays
  1900. // true. The risk is that some process on the node (e.g. sshd or kubelet)
  1901. // is using a port and we give that same port out to a Service. That would
  1902. // be bad because ipvs would silently claim the traffic but the process
  1903. // would never know.
  1904. // NOTE: We should not need to have a real listen()ing socket - bind()
  1905. // should be enough, but I can't figure out a way to e2e test without
  1906. // it. Tools like 'ss' and 'netstat' do not show sockets that are
  1907. // bind()ed but not listen()ed, and at least the default debian netcat
  1908. // has no way to avoid about 10 seconds of retries.
  1909. var socket utilproxy.Closeable
  1910. switch lp.Protocol {
  1911. case "tcp":
  1912. network := "tcp4"
  1913. if isIPv6 {
  1914. network = "tcp6"
  1915. }
  1916. listener, err := net.Listen(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
  1917. if err != nil {
  1918. return nil, err
  1919. }
  1920. socket = listener
  1921. case "udp":
  1922. network := "udp4"
  1923. if isIPv6 {
  1924. network = "udp6"
  1925. }
  1926. addr, err := net.ResolveUDPAddr(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
  1927. if err != nil {
  1928. return nil, err
  1929. }
  1930. conn, err := net.ListenUDP(network, addr)
  1931. if err != nil {
  1932. return nil, err
  1933. }
  1934. socket = conn
  1935. default:
  1936. return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
  1937. }
  1938. klog.V(2).Infof("Opened local port %s", lp.String())
  1939. return socket, nil
  1940. }
  1941. // ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets
  1942. // It will only operate iptables *nat table.
  1943. // Create and link the kube postrouting chain for SNAT packets.
  1944. // Chain POSTROUTING (policy ACCEPT)
  1945. // target prot opt source destination
  1946. // KUBE-POSTROUTING all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes postrouting rules *
  1947. // Maintain by kubelet network sync loop
  1948. // *nat
  1949. // :KUBE-POSTROUTING - [0:0]
  1950. // Chain KUBE-POSTROUTING (1 references)
  1951. // target prot opt source destination
  1952. // MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
  1953. // :KUBE-MARK-MASQ - [0:0]
  1954. // Chain KUBE-MARK-MASQ (0 references)
  1955. // target prot opt source destination
  1956. // MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x4000