proxier.go 69 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ipvs
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io/ioutil"
  18. "net"
  19. "regexp"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "k8s.io/klog"
  26. "k8s.io/api/core/v1"
  27. "k8s.io/apimachinery/pkg/types"
  28. "k8s.io/apimachinery/pkg/util/sets"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. "k8s.io/client-go/tools/record"
  31. "k8s.io/kubernetes/pkg/proxy"
  32. "k8s.io/kubernetes/pkg/proxy/healthcheck"
  33. "k8s.io/kubernetes/pkg/proxy/metrics"
  34. utilproxy "k8s.io/kubernetes/pkg/proxy/util"
  35. "k8s.io/kubernetes/pkg/util/async"
  36. "k8s.io/kubernetes/pkg/util/conntrack"
  37. utilipset "k8s.io/kubernetes/pkg/util/ipset"
  38. utiliptables "k8s.io/kubernetes/pkg/util/iptables"
  39. utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
  40. utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
  41. utilexec "k8s.io/utils/exec"
  42. utilnet "k8s.io/utils/net"
  43. )
  44. const (
  45. // kubeServicesChain is the services portal chain
  46. kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
  47. // KubeFireWallChain is the kubernetes firewall chain.
  48. KubeFireWallChain utiliptables.Chain = "KUBE-FIREWALL"
  49. // kubePostroutingChain is the kubernetes postrouting chain
  50. kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
  51. // KubeMarkMasqChain is the mark-for-masquerade chain
  52. KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
  53. // KubeNodePortChain is the kubernetes node port chain
  54. KubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT"
  55. // KubeMarkDropChain is the mark-for-drop chain
  56. KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
  57. // KubeForwardChain is the kubernetes forward chain
  58. KubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
  59. // KubeLoadBalancerChain is the kubernetes chain for loadbalancer type service
  60. KubeLoadBalancerChain utiliptables.Chain = "KUBE-LOAD-BALANCER"
  61. // DefaultScheduler is the default ipvs scheduler algorithm - round robin.
  62. DefaultScheduler = "rr"
  63. // DefaultDummyDevice is the default dummy interface which ipvs service address will bind to it.
  64. DefaultDummyDevice = "kube-ipvs0"
  65. )
  66. // iptablesJumpChain is tables of iptables chains that ipvs proxier used to install iptables or cleanup iptables.
  67. // `to` is the iptables chain we want to operate.
  68. // `from` is the source iptables chain
  69. var iptablesJumpChain = []struct {
  70. table utiliptables.Table
  71. from utiliptables.Chain
  72. to utiliptables.Chain
  73. comment string
  74. }{
  75. {utiliptables.TableNAT, utiliptables.ChainOutput, kubeServicesChain, "kubernetes service portals"},
  76. {utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"},
  77. {utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"},
  78. {utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, "kubernetes forwarding rules"},
  79. }
  80. var iptablesChains = []struct {
  81. table utiliptables.Table
  82. chain utiliptables.Chain
  83. }{
  84. {utiliptables.TableNAT, kubeServicesChain},
  85. {utiliptables.TableNAT, kubePostroutingChain},
  86. {utiliptables.TableNAT, KubeFireWallChain},
  87. {utiliptables.TableNAT, KubeNodePortChain},
  88. {utiliptables.TableNAT, KubeLoadBalancerChain},
  89. {utiliptables.TableNAT, KubeMarkMasqChain},
  90. {utiliptables.TableFilter, KubeForwardChain},
  91. }
  92. // ipsetInfo is all ipset we needed in ipvs proxier
  93. var ipsetInfo = []struct {
  94. name string
  95. setType utilipset.Type
  96. comment string
  97. }{
  98. {kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
  99. {kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment},
  100. {kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment},
  101. {kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment},
  102. {kubeLoadbalancerFWSet, utilipset.HashIPPort, kubeLoadbalancerFWSetComment},
  103. {kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment},
  104. {kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
  105. {kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
  106. {kubeNodePortSetTCP, utilipset.BitmapPort, kubeNodePortSetTCPComment},
  107. {kubeNodePortLocalSetTCP, utilipset.BitmapPort, kubeNodePortLocalSetTCPComment},
  108. {kubeNodePortSetUDP, utilipset.BitmapPort, kubeNodePortSetUDPComment},
  109. {kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment},
  110. {kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment},
  111. {kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment},
  112. }
  113. // ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to
  114. // `iptables -t nat -A <from> -m set --match-set <name> <matchType> -j <to>`
  115. // example: iptables -t nat -A KUBE-SERVICES -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-NODE-PORT
  116. // ipsets with other match rules will be created Individually.
  117. // Note: kubeNodePortLocalSetTCP must be prior to kubeNodePortSetTCP, the same for UDP.
  118. var ipsetWithIptablesChain = []struct {
  119. name string
  120. from string
  121. to string
  122. matchType string
  123. protocolMatch string
  124. }{
  125. {kubeLoopBackIPSet, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src", ""},
  126. {kubeLoadBalancerSet, string(kubeServicesChain), string(KubeLoadBalancerChain), "dst,dst", ""},
  127. {kubeLoadbalancerFWSet, string(KubeLoadBalancerChain), string(KubeFireWallChain), "dst,dst", ""},
  128. {kubeLoadBalancerSourceCIDRSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""},
  129. {kubeLoadBalancerSourceIPSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""},
  130. {kubeLoadBalancerLocalSet, string(KubeLoadBalancerChain), "RETURN", "dst,dst", ""},
  131. {kubeNodePortLocalSetTCP, string(KubeNodePortChain), "RETURN", "dst", "tcp"},
  132. {kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", "tcp"},
  133. {kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", "udp"},
  134. {kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", "udp"},
  135. {kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", "sctp"},
  136. {kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst,dst", "sctp"},
  137. }
  138. // In IPVS proxy mode, the following flags need to be set
  139. const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
  140. const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables"
  141. const sysctlVSConnTrack = "net/ipv4/vs/conntrack"
  142. const sysctlConnReuse = "net/ipv4/vs/conn_reuse_mode"
  143. const sysctlExpireNoDestConn = "net/ipv4/vs/expire_nodest_conn"
  144. const sysctlExpireQuiescentTemplate = "net/ipv4/vs/expire_quiescent_template"
  145. const sysctlForward = "net/ipv4/ip_forward"
  146. const sysctlArpIgnore = "net/ipv4/conf/all/arp_ignore"
  147. const sysctlArpAnnounce = "net/ipv4/conf/all/arp_announce"
  148. // Proxier is an ipvs based proxy for connections between a localhost:lport
  149. // and services that provide the actual backends.
  150. type Proxier struct {
  151. // endpointsChanges and serviceChanges contains all changes to endpoints and
  152. // services that happened since last syncProxyRules call. For a single object,
  153. // changes are accumulated, i.e. previous is state from before all of them,
  154. // current is state after applying all of those.
  155. endpointsChanges *proxy.EndpointChangeTracker
  156. serviceChanges *proxy.ServiceChangeTracker
  157. mu sync.Mutex // protects the following fields
  158. serviceMap proxy.ServiceMap
  159. endpointsMap proxy.EndpointsMap
  160. portsMap map[utilproxy.LocalPort]utilproxy.Closeable
  161. // endpointsSynced and servicesSynced are set to true when corresponding
  162. // objects are synced after startup. This is used to avoid updating ipvs rules
  163. // with some partial data after kube-proxy restart.
  164. endpointsSynced bool
  165. servicesSynced bool
  166. initialized int32
  167. syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
  168. // These are effectively const and do not need the mutex to be held.
  169. syncPeriod time.Duration
  170. minSyncPeriod time.Duration
  171. // Values are CIDR's to exclude when cleaning up IPVS rules.
  172. excludeCIDRs []*net.IPNet
  173. // Set to true to set sysctls arp_ignore and arp_announce
  174. strictARP bool
  175. iptables utiliptables.Interface
  176. ipvs utilipvs.Interface
  177. ipset utilipset.Interface
  178. exec utilexec.Interface
  179. masqueradeAll bool
  180. masqueradeMark string
  181. clusterCIDR string
  182. hostname string
  183. nodeIP net.IP
  184. portMapper utilproxy.PortOpener
  185. recorder record.EventRecorder
  186. healthChecker healthcheck.Server
  187. healthzServer healthcheck.HealthzUpdater
  188. ipvsScheduler string
  189. // Added as a member to the struct to allow injection for testing.
  190. ipGetter IPGetter
  191. // The following buffers are used to reuse memory and avoid allocations
  192. // that are significantly impacting performance.
  193. iptablesData *bytes.Buffer
  194. filterChainsData *bytes.Buffer
  195. natChains *bytes.Buffer
  196. filterChains *bytes.Buffer
  197. natRules *bytes.Buffer
  198. filterRules *bytes.Buffer
  199. // Added as a member to the struct to allow injection for testing.
  200. netlinkHandle NetLinkHandle
  201. // ipsetList is the list of ipsets that ipvs proxier used.
  202. ipsetList map[string]*IPSet
  203. // Values are as a parameter to select the interfaces which nodeport works.
  204. nodePortAddresses []string
  205. // networkInterfacer defines an interface for several net library functions.
  206. // Inject for test purpose.
  207. networkInterfacer utilproxy.NetworkInterfacer
  208. gracefuldeleteManager *GracefulTerminationManager
  209. }
  210. // IPGetter helps get node network interface IP
  211. type IPGetter interface {
  212. NodeIPs() ([]net.IP, error)
  213. }
  214. // realIPGetter is a real NodeIP handler, it implements IPGetter.
  215. type realIPGetter struct {
  216. // nl is a handle for revoking netlink interface
  217. nl NetLinkHandle
  218. }
  219. // NodeIPs returns all LOCAL type IP addresses from host which are taken as the Node IPs of NodePort service.
  220. // It will list source IP exists in local route table with `kernel` protocol type, and filter out IPVS proxier
  221. // created dummy device `kube-ipvs0` For example,
  222. // $ ip route show table local type local proto kernel
  223. // 10.0.0.1 dev kube-ipvs0 scope host src 10.0.0.1
  224. // 10.0.0.10 dev kube-ipvs0 scope host src 10.0.0.10
  225. // 10.0.0.252 dev kube-ipvs0 scope host src 10.0.0.252
  226. // 100.106.89.164 dev eth0 scope host src 100.106.89.164
  227. // 127.0.0.0/8 dev lo scope host src 127.0.0.1
  228. // 127.0.0.1 dev lo scope host src 127.0.0.1
  229. // 172.17.0.1 dev docker0 scope host src 172.17.0.1
  230. // 192.168.122.1 dev virbr0 scope host src 192.168.122.1
  231. // Then filter out dev==kube-ipvs0, and cut the unique src IP fields,
  232. // Node IP set: [100.106.89.164, 127.0.0.1, 192.168.122.1]
  233. func (r *realIPGetter) NodeIPs() (ips []net.IP, err error) {
  234. // Pass in empty filter device name for list all LOCAL type addresses.
  235. nodeAddress, err := r.nl.GetLocalAddresses("", DefaultDummyDevice)
  236. if err != nil {
  237. return nil, fmt.Errorf("error listing LOCAL type addresses from host, error: %v", err)
  238. }
  239. // translate ip string to IP
  240. for _, ipStr := range nodeAddress.UnsortedList() {
  241. ips = append(ips, net.ParseIP(ipStr))
  242. }
  243. return ips, nil
  244. }
  245. // Proxier implements ProxyProvider
  246. var _ proxy.ProxyProvider = &Proxier{}
  247. // parseExcludedCIDRs parses the input strings and returns net.IPNet
  248. // The validation has been done earlier so the error condition will never happen under normal conditions
  249. func parseExcludedCIDRs(excludeCIDRs []string) []*net.IPNet {
  250. var cidrExclusions []*net.IPNet
  251. for _, excludedCIDR := range excludeCIDRs {
  252. _, n, err := net.ParseCIDR(excludedCIDR)
  253. if err != nil {
  254. klog.Errorf("Error parsing exclude CIDR %q, err: %v", excludedCIDR, err)
  255. continue
  256. }
  257. cidrExclusions = append(cidrExclusions, n)
  258. }
  259. return cidrExclusions
  260. }
  261. // NewProxier returns a new Proxier given an iptables and ipvs Interface instance.
  262. // Because of the iptables and ipvs logic, it is assumed that there is only a single Proxier active on a machine.
  263. // An error will be returned if it fails to update or acquire the initial lock.
  264. // Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
  265. // will not terminate if a particular iptables or ipvs call fails.
  266. func NewProxier(ipt utiliptables.Interface,
  267. ipvs utilipvs.Interface,
  268. ipset utilipset.Interface,
  269. sysctl utilsysctl.Interface,
  270. exec utilexec.Interface,
  271. syncPeriod time.Duration,
  272. minSyncPeriod time.Duration,
  273. excludeCIDRs []string,
  274. strictARP bool,
  275. masqueradeAll bool,
  276. masqueradeBit int,
  277. clusterCIDR string,
  278. hostname string,
  279. nodeIP net.IP,
  280. recorder record.EventRecorder,
  281. healthzServer healthcheck.HealthzUpdater,
  282. scheduler string,
  283. nodePortAddresses []string,
  284. ) (*Proxier, error) {
  285. // Set the route_localnet sysctl we need for
  286. if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
  287. if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
  288. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
  289. }
  290. }
  291. // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
  292. // are connected to a Linux bridge (but not SDN bridges). Until most
  293. // plugins handle this, log when config is missing
  294. if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
  295. klog.Infof("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
  296. }
  297. // Set the conntrack sysctl we need for
  298. if val, _ := sysctl.GetSysctl(sysctlVSConnTrack); val != 1 {
  299. if err := sysctl.SetSysctl(sysctlVSConnTrack, 1); err != nil {
  300. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlVSConnTrack, err)
  301. }
  302. }
  303. // Set the connection reuse mode
  304. if val, _ := sysctl.GetSysctl(sysctlConnReuse); val != 0 {
  305. if err := sysctl.SetSysctl(sysctlConnReuse, 0); err != nil {
  306. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlConnReuse, err)
  307. }
  308. }
  309. // Set the expire_nodest_conn sysctl we need for
  310. if val, _ := sysctl.GetSysctl(sysctlExpireNoDestConn); val != 1 {
  311. if err := sysctl.SetSysctl(sysctlExpireNoDestConn, 1); err != nil {
  312. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireNoDestConn, err)
  313. }
  314. }
  315. // Set the expire_quiescent_template sysctl we need for
  316. if val, _ := sysctl.GetSysctl(sysctlExpireQuiescentTemplate); val != 1 {
  317. if err := sysctl.SetSysctl(sysctlExpireQuiescentTemplate, 1); err != nil {
  318. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireQuiescentTemplate, err)
  319. }
  320. }
  321. // Set the ip_forward sysctl we need for
  322. if val, _ := sysctl.GetSysctl(sysctlForward); val != 1 {
  323. if err := sysctl.SetSysctl(sysctlForward, 1); err != nil {
  324. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlForward, err)
  325. }
  326. }
  327. if strictARP {
  328. // Set the arp_ignore sysctl we need for
  329. if val, _ := sysctl.GetSysctl(sysctlArpIgnore); val != 1 {
  330. if err := sysctl.SetSysctl(sysctlArpIgnore, 1); err != nil {
  331. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpIgnore, err)
  332. }
  333. }
  334. // Set the arp_announce sysctl we need for
  335. if val, _ := sysctl.GetSysctl(sysctlArpAnnounce); val != 2 {
  336. if err := sysctl.SetSysctl(sysctlArpAnnounce, 2); err != nil {
  337. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpAnnounce, err)
  338. }
  339. }
  340. }
  341. // Generate the masquerade mark to use for SNAT rules.
  342. masqueradeValue := 1 << uint(masqueradeBit)
  343. masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
  344. if nodeIP == nil {
  345. klog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
  346. nodeIP = net.ParseIP("127.0.0.1")
  347. }
  348. isIPv6 := utilnet.IsIPv6(nodeIP)
  349. klog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)
  350. if len(clusterCIDR) == 0 {
  351. klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
  352. } else if utilnet.IsIPv6CIDRString(clusterCIDR) != isIPv6 {
  353. return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, isIPv6)
  354. }
  355. if len(scheduler) == 0 {
  356. klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler)
  357. scheduler = DefaultScheduler
  358. }
  359. healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
  360. proxier := &Proxier{
  361. portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
  362. serviceMap: make(proxy.ServiceMap),
  363. serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
  364. endpointsMap: make(proxy.EndpointsMap),
  365. endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
  366. syncPeriod: syncPeriod,
  367. minSyncPeriod: minSyncPeriod,
  368. excludeCIDRs: parseExcludedCIDRs(excludeCIDRs),
  369. iptables: ipt,
  370. masqueradeAll: masqueradeAll,
  371. masqueradeMark: masqueradeMark,
  372. exec: exec,
  373. clusterCIDR: clusterCIDR,
  374. hostname: hostname,
  375. nodeIP: nodeIP,
  376. portMapper: &listenPortOpener{},
  377. recorder: recorder,
  378. healthChecker: healthChecker,
  379. healthzServer: healthzServer,
  380. ipvs: ipvs,
  381. ipvsScheduler: scheduler,
  382. ipGetter: &realIPGetter{nl: NewNetLinkHandle(isIPv6)},
  383. iptablesData: bytes.NewBuffer(nil),
  384. filterChainsData: bytes.NewBuffer(nil),
  385. natChains: bytes.NewBuffer(nil),
  386. natRules: bytes.NewBuffer(nil),
  387. filterChains: bytes.NewBuffer(nil),
  388. filterRules: bytes.NewBuffer(nil),
  389. netlinkHandle: NewNetLinkHandle(isIPv6),
  390. ipset: ipset,
  391. nodePortAddresses: nodePortAddresses,
  392. networkInterfacer: utilproxy.RealNetwork{},
  393. gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
  394. }
  395. // initialize ipsetList with all sets we needed
  396. proxier.ipsetList = make(map[string]*IPSet)
  397. for _, is := range ipsetInfo {
  398. proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
  399. }
  400. burstSyncs := 2
  401. klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
  402. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
  403. proxier.gracefuldeleteManager.Run()
  404. return proxier, nil
  405. }
  406. // internal struct for string service information
  407. type serviceInfo struct {
  408. *proxy.BaseServiceInfo
  409. // The following fields are computed and stored for performance reasons.
  410. serviceNameString string
  411. }
  412. // returns a new proxy.ServicePort which abstracts a serviceInfo
  413. func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
  414. info := &serviceInfo{BaseServiceInfo: baseInfo}
  415. // Store the following for performance reasons.
  416. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  417. svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
  418. info.serviceNameString = svcPortName.String()
  419. return info
  420. }
  421. // KernelHandler can handle the current installed kernel modules.
  422. type KernelHandler interface {
  423. GetModules() ([]string, error)
  424. }
  425. // LinuxKernelHandler implements KernelHandler interface.
  426. type LinuxKernelHandler struct {
  427. executor utilexec.Interface
  428. }
  429. // NewLinuxKernelHandler initializes LinuxKernelHandler with exec.
  430. func NewLinuxKernelHandler() *LinuxKernelHandler {
  431. return &LinuxKernelHandler{
  432. executor: utilexec.New(),
  433. }
  434. }
  435. // GetModules returns all installed kernel modules.
  436. func (handle *LinuxKernelHandler) GetModules() ([]string, error) {
  437. // Check whether IPVS required kernel modules are built-in
  438. kernelVersion, ipvsModules, err := utilipvs.GetKernelVersionAndIPVSMods(handle.executor)
  439. if err != nil {
  440. return nil, err
  441. }
  442. builtinModsFilePath := fmt.Sprintf("/lib/modules/%s/modules.builtin", kernelVersion)
  443. b, err := ioutil.ReadFile(builtinModsFilePath)
  444. if err != nil {
  445. klog.Warningf("Failed to read file %s with error %v. You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", builtinModsFilePath, err)
  446. }
  447. var bmods []string
  448. for _, module := range ipvsModules {
  449. if match, _ := regexp.Match(module+".ko", b); match {
  450. bmods = append(bmods, module)
  451. }
  452. }
  453. // Try to load IPVS required kernel modules using modprobe first
  454. for _, kmod := range ipvsModules {
  455. err := handle.executor.Command("modprobe", "--", kmod).Run()
  456. if err != nil {
  457. klog.Warningf("Failed to load kernel module %v with modprobe. "+
  458. "You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", kmod)
  459. }
  460. }
  461. // Find out loaded kernel modules
  462. out, err := handle.executor.Command("cut", "-f1", "-d", " ", "/proc/modules").CombinedOutput()
  463. if err != nil {
  464. return nil, err
  465. }
  466. mods := strings.Split(string(out), "\n")
  467. return append(mods, bmods...), nil
  468. }
  469. // CanUseIPVSProxier returns true if we can use the ipvs Proxier.
  470. // This is determined by checking if all the required kernel modules can be loaded. It may
  471. // return an error if it fails to get the kernel modules information without error, in which
  472. // case it will also return false.
  473. func CanUseIPVSProxier(handle KernelHandler, ipsetver IPSetVersioner) (bool, error) {
  474. mods, err := handle.GetModules()
  475. if err != nil {
  476. return false, fmt.Errorf("error getting installed ipvs required kernel modules: %v", err)
  477. }
  478. wantModules := sets.NewString()
  479. loadModules := sets.NewString()
  480. linuxKernelHandler := NewLinuxKernelHandler()
  481. _, ipvsModules, _ := utilipvs.GetKernelVersionAndIPVSMods(linuxKernelHandler.executor)
  482. wantModules.Insert(ipvsModules...)
  483. loadModules.Insert(mods...)
  484. modules := wantModules.Difference(loadModules).UnsortedList()
  485. var missingMods []string
  486. ConntrackiMissingCounter := 0
  487. for _, mod := range modules {
  488. if strings.Contains(mod, "nf_conntrack") {
  489. ConntrackiMissingCounter++
  490. } else {
  491. missingMods = append(missingMods, mod)
  492. }
  493. }
  494. if ConntrackiMissingCounter == 2 {
  495. missingMods = append(missingMods, "nf_conntrack_ipv4(or nf_conntrack for Linux kernel 4.19 and later)")
  496. }
  497. if len(missingMods) != 0 {
  498. return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", missingMods)
  499. }
  500. // Check ipset version
  501. versionString, err := ipsetver.GetVersion()
  502. if err != nil {
  503. return false, fmt.Errorf("error getting ipset version, error: %v", err)
  504. }
  505. if !checkMinVersion(versionString) {
  506. return false, fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion)
  507. }
  508. return true, nil
  509. }
  510. // CleanupIptablesLeftovers removes all iptables rules and chains created by the Proxier
  511. // It returns true if an error was encountered. Errors are logged.
  512. func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
  513. // Unlink the iptables chains created by ipvs Proxier
  514. for _, jc := range iptablesJumpChain {
  515. args := []string{
  516. "-m", "comment", "--comment", jc.comment,
  517. "-j", string(jc.to),
  518. }
  519. if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil {
  520. if !utiliptables.IsNotFoundError(err) {
  521. klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
  522. encounteredError = true
  523. }
  524. }
  525. }
  526. // Flush and remove all of our chains. Flushing all chains before removing them also removes all links between chains first.
  527. for _, ch := range iptablesChains {
  528. if err := ipt.FlushChain(ch.table, ch.chain); err != nil {
  529. if !utiliptables.IsNotFoundError(err) {
  530. klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
  531. encounteredError = true
  532. }
  533. }
  534. }
  535. // Remove all of our chains.
  536. for _, ch := range iptablesChains {
  537. if err := ipt.DeleteChain(ch.table, ch.chain); err != nil {
  538. if !utiliptables.IsNotFoundError(err) {
  539. klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
  540. encounteredError = true
  541. }
  542. }
  543. }
  544. return encounteredError
  545. }
  546. // CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier.
  547. func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface, cleanupIPVS bool) (encounteredError bool) {
  548. if cleanupIPVS {
  549. // Return immediately when ipvs interface is nil - Probably initialization failed in somewhere.
  550. if ipvs == nil {
  551. return true
  552. }
  553. encounteredError = false
  554. err := ipvs.Flush()
  555. if err != nil {
  556. klog.Errorf("Error flushing IPVS rules: %v", err)
  557. encounteredError = true
  558. }
  559. }
  560. // Delete dummy interface created by ipvs Proxier.
  561. nl := NewNetLinkHandle(false)
  562. err := nl.DeleteDummyDevice(DefaultDummyDevice)
  563. if err != nil {
  564. klog.Errorf("Error deleting dummy device %s created by IPVS proxier: %v", DefaultDummyDevice, err)
  565. encounteredError = true
  566. }
  567. // Clear iptables created by ipvs Proxier.
  568. encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError
  569. // Destroy ip sets created by ipvs Proxier. We should call it after cleaning up
  570. // iptables since we can NOT delete ip set which is still referenced by iptables.
  571. for _, set := range ipsetInfo {
  572. err = ipset.DestroySet(set.name)
  573. if err != nil {
  574. if !utilipset.IsNotFoundError(err) {
  575. klog.Errorf("Error removing ipset %s, error: %v", set.name, err)
  576. encounteredError = true
  577. }
  578. }
  579. }
  580. return encounteredError
  581. }
  582. // Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
  583. func (proxier *Proxier) Sync() {
  584. proxier.syncRunner.Run()
  585. }
  586. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
  587. func (proxier *Proxier) SyncLoop() {
  588. // Update healthz timestamp at beginning in case Sync() never succeeds.
  589. if proxier.healthzServer != nil {
  590. proxier.healthzServer.UpdateTimestamp()
  591. }
  592. proxier.syncRunner.Loop(wait.NeverStop)
  593. }
  594. func (proxier *Proxier) setInitialized(value bool) {
  595. var initialized int32
  596. if value {
  597. initialized = 1
  598. }
  599. atomic.StoreInt32(&proxier.initialized, initialized)
  600. }
  601. func (proxier *Proxier) isInitialized() bool {
  602. return atomic.LoadInt32(&proxier.initialized) > 0
  603. }
  604. // OnServiceAdd is called whenever creation of new service object is observed.
  605. func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  606. proxier.OnServiceUpdate(nil, service)
  607. }
  608. // OnServiceUpdate is called whenever modification of an existing service object is observed.
  609. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  610. if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
  611. proxier.syncRunner.Run()
  612. }
  613. }
  614. // OnServiceDelete is called whenever deletion of an existing service object is observed.
  615. func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  616. proxier.OnServiceUpdate(service, nil)
  617. }
  618. // OnServiceSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
  619. func (proxier *Proxier) OnServiceSynced() {
  620. proxier.mu.Lock()
  621. proxier.servicesSynced = true
  622. proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
  623. proxier.mu.Unlock()
  624. // Sync unconditionally - this is called once per lifetime.
  625. proxier.syncProxyRules()
  626. }
  627. // OnEndpointsAdd is called whenever creation of new endpoints object is observed.
  628. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
  629. proxier.OnEndpointsUpdate(nil, endpoints)
  630. }
  631. // OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
  632. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  633. if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
  634. proxier.syncRunner.Run()
  635. }
  636. }
  637. // OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
  638. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
  639. proxier.OnEndpointsUpdate(endpoints, nil)
  640. }
  641. // OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
  642. func (proxier *Proxier) OnEndpointsSynced() {
  643. proxier.mu.Lock()
  644. proxier.endpointsSynced = true
  645. proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
  646. proxier.mu.Unlock()
  647. // Sync unconditionally - this is called once per lifetime.
  648. proxier.syncProxyRules()
  649. }
  650. // EntryInvalidErr indicates if an ipset entry is invalid or not
  651. const EntryInvalidErr = "error adding entry %s to ipset %s"
  652. // This is where all of the ipvs calls happen.
  653. // assumes proxier.mu is held
  654. func (proxier *Proxier) syncProxyRules() {
  655. proxier.mu.Lock()
  656. defer proxier.mu.Unlock()
  657. start := time.Now()
  658. defer func() {
  659. metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
  660. metrics.DeprecatedSyncProxyRulesLatency.Observe(metrics.SinceInMicroseconds(start))
  661. klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
  662. }()
  663. // don't sync rules till we've received services and endpoints
  664. if !proxier.endpointsSynced || !proxier.servicesSynced {
  665. klog.V(2).Info("Not syncing ipvs rules until Services and Endpoints have been received from master")
  666. return
  667. }
  668. // We assume that if this was called, we really want to sync them,
  669. // even if nothing changed in the meantime. In other words, callers are
  670. // responsible for detecting no-op changes and not calling this function.
  671. serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
  672. endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
  673. staleServices := serviceUpdateResult.UDPStaleClusterIP
  674. // merge stale services gathered from updateEndpointsMap
  675. for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
  676. if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP {
  677. klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString())
  678. staleServices.Insert(svcInfo.ClusterIPString())
  679. for _, extIP := range svcInfo.ExternalIPStrings() {
  680. staleServices.Insert(extIP)
  681. }
  682. }
  683. }
  684. klog.V(3).Infof("Syncing ipvs Proxier rules")
  685. // Begin install iptables
  686. // Reset all buffers used later.
  687. // This is to avoid memory reallocations and thus improve performance.
  688. proxier.natChains.Reset()
  689. proxier.natRules.Reset()
  690. proxier.filterChains.Reset()
  691. proxier.filterRules.Reset()
  692. // Write table headers.
  693. writeLine(proxier.filterChains, "*filter")
  694. writeLine(proxier.natChains, "*nat")
  695. proxier.createAndLinkeKubeChain()
  696. // make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
  697. _, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  698. if err != nil {
  699. klog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
  700. return
  701. }
  702. // make sure ip sets exists in the system.
  703. for _, set := range proxier.ipsetList {
  704. if err := ensureIPSet(set); err != nil {
  705. return
  706. }
  707. set.resetEntries()
  708. }
  709. // Accumulate the set of local ports that we will be holding open once this update is complete
  710. replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
  711. // activeIPVSServices represents IPVS service successfully created in this round of sync
  712. activeIPVSServices := map[string]bool{}
  713. // currentIPVSServices represent IPVS services listed from the system
  714. currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
  715. // activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync
  716. activeBindAddrs := map[string]bool{}
  717. // Build IPVS rules for each service.
  718. for svcName, svc := range proxier.serviceMap {
  719. svcInfo, ok := svc.(*serviceInfo)
  720. if !ok {
  721. klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
  722. continue
  723. }
  724. protocol := strings.ToLower(string(svcInfo.Protocol))
  725. // Precompute svcNameString; with many services the many calls
  726. // to ServicePortName.String() show up in CPU profiles.
  727. svcNameString := svcName.String()
  728. // Handle traffic that loops back to the originator with SNAT.
  729. for _, e := range proxier.endpointsMap[svcName] {
  730. ep, ok := e.(*proxy.BaseEndpointInfo)
  731. if !ok {
  732. klog.Errorf("Failed to cast BaseEndpointInfo %q", e.String())
  733. continue
  734. }
  735. if !ep.IsLocal {
  736. continue
  737. }
  738. epIP := ep.IP()
  739. epPort, err := ep.Port()
  740. // Error parsing this endpoint has been logged. Skip to next endpoint.
  741. if epIP == "" || err != nil {
  742. continue
  743. }
  744. entry := &utilipset.Entry{
  745. IP: epIP,
  746. Port: epPort,
  747. Protocol: protocol,
  748. IP2: epIP,
  749. SetType: utilipset.HashIPPortIP,
  750. }
  751. if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
  752. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name))
  753. continue
  754. }
  755. proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
  756. }
  757. // Capture the clusterIP.
  758. // ipset call
  759. entry := &utilipset.Entry{
  760. IP: svcInfo.ClusterIP.String(),
  761. Port: svcInfo.Port,
  762. Protocol: protocol,
  763. SetType: utilipset.HashIPPort,
  764. }
  765. // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
  766. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
  767. if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
  768. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name))
  769. continue
  770. }
  771. proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
  772. // ipvs call
  773. serv := &utilipvs.VirtualServer{
  774. Address: svcInfo.ClusterIP,
  775. Port: uint16(svcInfo.Port),
  776. Protocol: string(svcInfo.Protocol),
  777. Scheduler: proxier.ipvsScheduler,
  778. }
  779. // Set session affinity flag and timeout for IPVS service
  780. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  781. serv.Flags |= utilipvs.FlagPersistent
  782. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds)
  783. }
  784. // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
  785. if err := proxier.syncService(svcNameString, serv, true); err == nil {
  786. activeIPVSServices[serv.String()] = true
  787. activeBindAddrs[serv.Address.String()] = true
  788. // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
  789. // So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
  790. if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
  791. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  792. }
  793. } else {
  794. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  795. }
  796. // Capture externalIPs.
  797. for _, externalIP := range svcInfo.ExternalIPs {
  798. if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
  799. klog.Errorf("can't determine if IP is local, assuming not: %v", err)
  800. // We do not start listening on SCTP ports, according to our agreement in the
  801. // SCTP support KEP
  802. } else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) {
  803. lp := utilproxy.LocalPort{
  804. Description: "externalIP for " + svcNameString,
  805. IP: externalIP,
  806. Port: svcInfo.Port,
  807. Protocol: protocol,
  808. }
  809. if proxier.portsMap[lp] != nil {
  810. klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
  811. replacementPortsMap[lp] = proxier.portsMap[lp]
  812. } else {
  813. socket, err := proxier.portMapper.OpenLocalPort(&lp)
  814. if err != nil {
  815. msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
  816. proxier.recorder.Eventf(
  817. &v1.ObjectReference{
  818. Kind: "Node",
  819. Name: proxier.hostname,
  820. UID: types.UID(proxier.hostname),
  821. Namespace: "",
  822. }, v1.EventTypeWarning, err.Error(), msg)
  823. klog.Error(msg)
  824. continue
  825. }
  826. replacementPortsMap[lp] = socket
  827. }
  828. } // We're holding the port, so it's OK to install IPVS rules.
  829. // ipset call
  830. entry := &utilipset.Entry{
  831. IP: externalIP,
  832. Port: svcInfo.Port,
  833. Protocol: protocol,
  834. SetType: utilipset.HashIPPort,
  835. }
  836. // We have to SNAT packets to external IPs.
  837. if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
  838. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name))
  839. continue
  840. }
  841. proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
  842. // ipvs call
  843. serv := &utilipvs.VirtualServer{
  844. Address: net.ParseIP(externalIP),
  845. Port: uint16(svcInfo.Port),
  846. Protocol: string(svcInfo.Protocol),
  847. Scheduler: proxier.ipvsScheduler,
  848. }
  849. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  850. serv.Flags |= utilipvs.FlagPersistent
  851. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds)
  852. }
  853. if err := proxier.syncService(svcNameString, serv, true); err == nil {
  854. activeIPVSServices[serv.String()] = true
  855. activeBindAddrs[serv.Address.String()] = true
  856. if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
  857. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  858. }
  859. } else {
  860. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  861. }
  862. }
  863. // Capture load-balancer ingress.
  864. for _, ingress := range svcInfo.LoadBalancerStatus.Ingress {
  865. if ingress.IP != "" {
  866. // ipset call
  867. entry = &utilipset.Entry{
  868. IP: ingress.IP,
  869. Port: svcInfo.Port,
  870. Protocol: protocol,
  871. SetType: utilipset.HashIPPort,
  872. }
  873. // add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
  874. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
  875. // If we are proxying globally, we need to masquerade in case we cross nodes.
  876. // If we are proxying only locally, we can retain the source IP.
  877. if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
  878. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSet].Name))
  879. continue
  880. }
  881. proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
  882. // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
  883. if svcInfo.OnlyNodeLocalEndpoints {
  884. if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
  885. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name))
  886. continue
  887. }
  888. proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
  889. }
  890. if len(svcInfo.LoadBalancerSourceRanges) != 0 {
  891. // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
  892. // This currently works for loadbalancers that preserves source ips.
  893. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
  894. if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
  895. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadbalancerFWSet].Name))
  896. continue
  897. }
  898. proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
  899. allowFromNode := false
  900. for _, src := range svcInfo.LoadBalancerSourceRanges {
  901. // ipset call
  902. entry = &utilipset.Entry{
  903. IP: ingress.IP,
  904. Port: svcInfo.Port,
  905. Protocol: protocol,
  906. Net: src,
  907. SetType: utilipset.HashIPPortNet,
  908. }
  909. // enumerate all white list source cidr
  910. if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
  911. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name))
  912. continue
  913. }
  914. proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
  915. // ignore error because it has been validated
  916. _, cidr, _ := net.ParseCIDR(src)
  917. if cidr.Contains(proxier.nodeIP) {
  918. allowFromNode = true
  919. }
  920. }
  921. // generally, ip route rule was added to intercept request to loadbalancer vip from the
  922. // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
  923. // Need to add the following rule to allow request on host.
  924. if allowFromNode {
  925. entry = &utilipset.Entry{
  926. IP: ingress.IP,
  927. Port: svcInfo.Port,
  928. Protocol: protocol,
  929. IP2: ingress.IP,
  930. SetType: utilipset.HashIPPortIP,
  931. }
  932. // enumerate all white list source ip
  933. if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
  934. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name))
  935. continue
  936. }
  937. proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
  938. }
  939. }
  940. // ipvs call
  941. serv := &utilipvs.VirtualServer{
  942. Address: net.ParseIP(ingress.IP),
  943. Port: uint16(svcInfo.Port),
  944. Protocol: string(svcInfo.Protocol),
  945. Scheduler: proxier.ipvsScheduler,
  946. }
  947. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  948. serv.Flags |= utilipvs.FlagPersistent
  949. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds)
  950. }
  951. if err := proxier.syncService(svcNameString, serv, true); err == nil {
  952. activeIPVSServices[serv.String()] = true
  953. activeBindAddrs[serv.Address.String()] = true
  954. if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil {
  955. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  956. }
  957. } else {
  958. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  959. }
  960. }
  961. }
  962. if svcInfo.NodePort != 0 {
  963. addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
  964. if err != nil {
  965. klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
  966. continue
  967. }
  968. var lps []utilproxy.LocalPort
  969. for address := range addresses {
  970. lp := utilproxy.LocalPort{
  971. Description: "nodePort for " + svcNameString,
  972. IP: address,
  973. Port: svcInfo.NodePort,
  974. Protocol: protocol,
  975. }
  976. if utilproxy.IsZeroCIDR(address) {
  977. // Empty IP address means all
  978. lp.IP = ""
  979. lps = append(lps, lp)
  980. // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
  981. break
  982. }
  983. lps = append(lps, lp)
  984. }
  985. // For ports on node IPs, open the actual port and hold it.
  986. for _, lp := range lps {
  987. if proxier.portsMap[lp] != nil {
  988. klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
  989. replacementPortsMap[lp] = proxier.portsMap[lp]
  990. // We do not start listening on SCTP ports, according to our agreement in the
  991. // SCTP support KEP
  992. } else if svcInfo.GetProtocol() != v1.ProtocolSCTP {
  993. socket, err := proxier.portMapper.OpenLocalPort(&lp)
  994. if err != nil {
  995. klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
  996. continue
  997. }
  998. if lp.Protocol == "udp" {
  999. isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP)
  1000. conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
  1001. }
  1002. replacementPortsMap[lp] = socket
  1003. } // We're holding the port, so it's OK to install ipvs rules.
  1004. }
  1005. // Nodeports need SNAT, unless they're local.
  1006. // ipset call
  1007. var nodePortSet *IPSet
  1008. switch protocol {
  1009. case "tcp":
  1010. nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
  1011. entry = &utilipset.Entry{
  1012. // No need to provide ip info
  1013. Port: svcInfo.NodePort,
  1014. Protocol: protocol,
  1015. SetType: utilipset.BitmapPort,
  1016. }
  1017. case "udp":
  1018. nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
  1019. entry = &utilipset.Entry{
  1020. // No need to provide ip info
  1021. Port: svcInfo.NodePort,
  1022. Protocol: protocol,
  1023. SetType: utilipset.BitmapPort,
  1024. }
  1025. case "sctp":
  1026. nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
  1027. entry = &utilipset.Entry{
  1028. IP: proxier.nodeIP.String(),
  1029. Port: svcInfo.NodePort,
  1030. Protocol: protocol,
  1031. SetType: utilipset.HashIPPort,
  1032. }
  1033. default:
  1034. // It should never hit
  1035. klog.Errorf("Unsupported protocol type: %s", protocol)
  1036. }
  1037. if nodePortSet != nil {
  1038. if valid := nodePortSet.validateEntry(entry); !valid {
  1039. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
  1040. continue
  1041. }
  1042. nodePortSet.activeEntries.Insert(entry.String())
  1043. }
  1044. // Add externaltrafficpolicy=local type nodeport entry
  1045. if svcInfo.OnlyNodeLocalEndpoints {
  1046. var nodePortLocalSet *IPSet
  1047. switch protocol {
  1048. case "tcp":
  1049. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
  1050. case "udp":
  1051. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
  1052. case "sctp":
  1053. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
  1054. default:
  1055. // It should never hit
  1056. klog.Errorf("Unsupported protocol type: %s", protocol)
  1057. }
  1058. if nodePortLocalSet != nil {
  1059. if valid := nodePortLocalSet.validateEntry(entry); !valid {
  1060. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
  1061. continue
  1062. }
  1063. nodePortLocalSet.activeEntries.Insert(entry.String())
  1064. }
  1065. }
  1066. // Build ipvs kernel routes for each node ip address
  1067. var nodeIPs []net.IP
  1068. for address := range addresses {
  1069. if !utilproxy.IsZeroCIDR(address) {
  1070. nodeIPs = append(nodeIPs, net.ParseIP(address))
  1071. continue
  1072. }
  1073. // zero cidr
  1074. nodeIPs, err = proxier.ipGetter.NodeIPs()
  1075. if err != nil {
  1076. klog.Errorf("Failed to list all node IPs from host, err: %v", err)
  1077. }
  1078. }
  1079. for _, nodeIP := range nodeIPs {
  1080. // ipvs call
  1081. serv := &utilipvs.VirtualServer{
  1082. Address: nodeIP,
  1083. Port: uint16(svcInfo.NodePort),
  1084. Protocol: string(svcInfo.Protocol),
  1085. Scheduler: proxier.ipvsScheduler,
  1086. }
  1087. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  1088. serv.Flags |= utilipvs.FlagPersistent
  1089. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds)
  1090. }
  1091. // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
  1092. if err := proxier.syncService(svcNameString, serv, false); err == nil {
  1093. activeIPVSServices[serv.String()] = true
  1094. if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil {
  1095. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  1096. }
  1097. } else {
  1098. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  1099. }
  1100. }
  1101. }
  1102. }
  1103. // sync ipset entries
  1104. for _, set := range proxier.ipsetList {
  1105. set.syncIPSetEntries()
  1106. }
  1107. // Tail call iptables rules for ipset, make sure only call iptables once
  1108. // in a single loop per ip set.
  1109. proxier.writeIptablesRules()
  1110. // Sync iptables rules.
  1111. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
  1112. proxier.iptablesData.Reset()
  1113. proxier.iptablesData.Write(proxier.natChains.Bytes())
  1114. proxier.iptablesData.Write(proxier.natRules.Bytes())
  1115. proxier.iptablesData.Write(proxier.filterChains.Bytes())
  1116. proxier.iptablesData.Write(proxier.filterRules.Bytes())
  1117. klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
  1118. err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
  1119. if err != nil {
  1120. klog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
  1121. // Revert new local ports.
  1122. utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
  1123. return
  1124. }
  1125. for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes {
  1126. latency := metrics.SinceInSeconds(lastChangeTriggerTime)
  1127. metrics.NetworkProgrammingLatency.Observe(latency)
  1128. klog.V(4).Infof("Network programming took %f seconds", latency)
  1129. }
  1130. // Close old local ports and save new ones.
  1131. for k, v := range proxier.portsMap {
  1132. if replacementPortsMap[k] == nil {
  1133. v.Close()
  1134. }
  1135. }
  1136. proxier.portsMap = replacementPortsMap
  1137. // Get legacy bind address
  1138. // currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
  1139. currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  1140. if err != nil {
  1141. klog.Errorf("Failed to get bind address, err: %v", err)
  1142. }
  1143. legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
  1144. // Clean up legacy IPVS services and unbind addresses
  1145. appliedSvcs, err := proxier.ipvs.GetVirtualServers()
  1146. if err == nil {
  1147. for _, appliedSvc := range appliedSvcs {
  1148. currentIPVSServices[appliedSvc.String()] = appliedSvc
  1149. }
  1150. } else {
  1151. klog.Errorf("Failed to get ipvs service, err: %v", err)
  1152. }
  1153. proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
  1154. // Update healthz timestamp
  1155. if proxier.healthzServer != nil {
  1156. proxier.healthzServer.UpdateTimestamp()
  1157. }
  1158. metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
  1159. // Update healthchecks. The endpoints list might include services that are
  1160. // not "OnlyLocal", but the services list will not, and the healthChecker
  1161. // will just drop those endpoints.
  1162. if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
  1163. klog.Errorf("Error syncing healthcheck services: %v", err)
  1164. }
  1165. if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
  1166. klog.Errorf("Error syncing healthcheck endpoints: %v", err)
  1167. }
  1168. // Finish housekeeping.
  1169. // TODO: these could be made more consistent.
  1170. for _, svcIP := range staleServices.UnsortedList() {
  1171. if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
  1172. klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
  1173. }
  1174. }
  1175. proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
  1176. }
  1177. // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
  1178. // according to proxier.ipsetList information and the ipset match relationship that `ipsetWithIptablesChain` specified.
  1179. // some ipset(kubeClusterIPSet for example) have particular match rules and iptables jump relation should be sync separately.
  1180. func (proxier *Proxier) writeIptablesRules() {
  1181. // We are creating those slices ones here to avoid memory reallocations
  1182. // in every loop. Note that reuse the memory, instead of doing:
  1183. // slice = <some new slice>
  1184. // you should always do one of the below:
  1185. // slice = slice[:0] // and then append to it
  1186. // slice = append(slice[:0], ...)
  1187. // To avoid growing this slice, we arbitrarily set its size to 64,
  1188. // there is never more than that many arguments for a single line.
  1189. // Note that even if we go over 64, it will still be correct - it
  1190. // is just for efficiency, not correctness.
  1191. args := make([]string, 64)
  1192. for _, set := range ipsetWithIptablesChain {
  1193. if _, find := proxier.ipsetList[set.name]; find && !proxier.ipsetList[set.name].isEmpty() {
  1194. args = append(args[:0], "-A", set.from)
  1195. if set.protocolMatch != "" {
  1196. args = append(args, "-p", set.protocolMatch)
  1197. }
  1198. args = append(args,
  1199. "-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(),
  1200. "-m", "set", "--match-set", set.name,
  1201. set.matchType,
  1202. )
  1203. writeLine(proxier.natRules, append(args, "-j", set.to)...)
  1204. }
  1205. }
  1206. if !proxier.ipsetList[kubeClusterIPSet].isEmpty() {
  1207. args = append(args[:0],
  1208. "-A", string(kubeServicesChain),
  1209. "-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(),
  1210. "-m", "set", "--match-set", kubeClusterIPSet,
  1211. )
  1212. if proxier.masqueradeAll {
  1213. writeLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...)
  1214. } else if len(proxier.clusterCIDR) > 0 {
  1215. // This masquerades off-cluster traffic to a service VIP. The idea
  1216. // is that you can establish a static route for your Service range,
  1217. // routing to any node, and that node will bridge into the Service
  1218. // for you. Since that might bounce off-node, we masquerade here.
  1219. // If/when we support "Local" policy for VIPs, we should update this.
  1220. writeLine(proxier.natRules, append(args, "dst,dst", "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
  1221. } else {
  1222. // Masquerade all OUTPUT traffic coming from a service ip.
  1223. // The kube dummy interface has all service VIPs assigned which
  1224. // results in the service VIP being picked as the source IP to reach
  1225. // a VIP. This leads to a connection from VIP:<random port> to
  1226. // VIP:<service port>.
  1227. // Always masquerading OUTPUT (node-originating) traffic with a VIP
  1228. // source ip and service port destination fixes the outgoing connections.
  1229. writeLine(proxier.natRules, append(args, "src,dst", "-j", string(KubeMarkMasqChain))...)
  1230. }
  1231. }
  1232. if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
  1233. // Build masquerade rules for packets to external IPs.
  1234. args = append(args[:0],
  1235. "-A", string(kubeServicesChain),
  1236. "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
  1237. "-m", "set", "--match-set", kubeExternalIPSet,
  1238. "dst,dst",
  1239. )
  1240. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
  1241. // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
  1242. // nor from a local process to be forwarded to the service.
  1243. // This rule roughly translates to "all traffic from off-machine".
  1244. // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
  1245. externalTrafficOnlyArgs := append(args,
  1246. "-m", "physdev", "!", "--physdev-is-in",
  1247. "-m", "addrtype", "!", "--src-type", "LOCAL")
  1248. writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...)
  1249. dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
  1250. // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
  1251. // This covers cases like GCE load-balancers which get added to the local routing table.
  1252. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
  1253. }
  1254. // -A KUBE-SERVICES -m addrtype --dst-type LOCAL -j KUBE-NODE-PORT
  1255. args = append(args[:0],
  1256. "-A", string(kubeServicesChain),
  1257. "-m", "addrtype", "--dst-type", "LOCAL",
  1258. )
  1259. writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...)
  1260. // mark drop for KUBE-LOAD-BALANCER
  1261. writeLine(proxier.natRules, []string{
  1262. "-A", string(KubeLoadBalancerChain),
  1263. "-j", string(KubeMarkMasqChain),
  1264. }...)
  1265. // mark drop for KUBE-FIRE-WALL
  1266. writeLine(proxier.natRules, []string{
  1267. "-A", string(KubeFireWallChain),
  1268. "-j", string(KubeMarkDropChain),
  1269. }...)
  1270. // Accept all traffic with destination of ipvs virtual service, in case other iptables rules
  1271. // block the traffic, that may result in ipvs rules invalid.
  1272. // Those rules must be in the end of KUBE-SERVICE chain
  1273. proxier.acceptIPVSTraffic()
  1274. // If the masqueradeMark has been added then we want to forward that same
  1275. // traffic, this allows NodePort traffic to be forwarded even if the default
  1276. // FORWARD policy is not accept.
  1277. writeLine(proxier.filterRules,
  1278. "-A", string(KubeForwardChain),
  1279. "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
  1280. "-m", "mark", "--mark", proxier.masqueradeMark,
  1281. "-j", "ACCEPT",
  1282. )
  1283. // The following rules can only be set if clusterCIDR has been defined.
  1284. if len(proxier.clusterCIDR) != 0 {
  1285. // The following two rules ensure the traffic after the initial packet
  1286. // accepted by the "kubernetes forwarding rules" rule above will be
  1287. // accepted, to be as specific as possible the traffic must be sourced
  1288. // or destined to the clusterCIDR (to/from a pod).
  1289. writeLine(proxier.filterRules,
  1290. "-A", string(KubeForwardChain),
  1291. "-s", proxier.clusterCIDR,
  1292. "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
  1293. "-m", "conntrack",
  1294. "--ctstate", "RELATED,ESTABLISHED",
  1295. "-j", "ACCEPT",
  1296. )
  1297. writeLine(proxier.filterRules,
  1298. "-A", string(KubeForwardChain),
  1299. "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
  1300. "-d", proxier.clusterCIDR,
  1301. "-m", "conntrack",
  1302. "--ctstate", "RELATED,ESTABLISHED",
  1303. "-j", "ACCEPT",
  1304. )
  1305. }
  1306. // Write the end-of-table markers.
  1307. writeLine(proxier.filterRules, "COMMIT")
  1308. writeLine(proxier.natRules, "COMMIT")
  1309. }
  1310. func (proxier *Proxier) acceptIPVSTraffic() {
  1311. sets := []string{kubeClusterIPSet, kubeLoadBalancerSet}
  1312. for _, set := range sets {
  1313. var matchType string
  1314. if !proxier.ipsetList[set].isEmpty() {
  1315. switch proxier.ipsetList[set].SetType {
  1316. case utilipset.BitmapPort:
  1317. matchType = "dst"
  1318. default:
  1319. matchType = "dst,dst"
  1320. }
  1321. writeLine(proxier.natRules, []string{
  1322. "-A", string(kubeServicesChain),
  1323. "-m", "set", "--match-set", set, matchType,
  1324. "-j", "ACCEPT",
  1325. }...)
  1326. }
  1327. }
  1328. }
  1329. // createAndLinkeKubeChain create all kube chains that ipvs proxier need and write basic link.
  1330. func (proxier *Proxier) createAndLinkeKubeChain() {
  1331. existingFilterChains := proxier.getExistingChains(proxier.filterChainsData, utiliptables.TableFilter)
  1332. existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT)
  1333. // Make sure we keep stats for the top-level chains
  1334. for _, ch := range iptablesChains {
  1335. if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
  1336. klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err)
  1337. return
  1338. }
  1339. if ch.table == utiliptables.TableNAT {
  1340. if chain, ok := existingNATChains[ch.chain]; ok {
  1341. writeBytesLine(proxier.natChains, chain)
  1342. } else {
  1343. writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
  1344. }
  1345. } else {
  1346. if chain, ok := existingFilterChains[KubeForwardChain]; ok {
  1347. writeBytesLine(proxier.filterChains, chain)
  1348. } else {
  1349. writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain))
  1350. }
  1351. }
  1352. }
  1353. for _, jc := range iptablesJumpChain {
  1354. args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
  1355. if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
  1356. klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jc.table, jc.from, jc.to, err)
  1357. }
  1358. }
  1359. // Install the kubernetes-specific postrouting rules. We use a whole chain for
  1360. // this so that it is easier to flush and change, for example if the mark
  1361. // value should ever change.
  1362. writeLine(proxier.natRules, []string{
  1363. "-A", string(kubePostroutingChain),
  1364. "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
  1365. "-m", "mark", "--mark", proxier.masqueradeMark,
  1366. "-j", "MASQUERADE",
  1367. }...)
  1368. // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
  1369. // this so that it is easier to flush and change, for example if the mark
  1370. // value should ever change.
  1371. writeLine(proxier.natRules, []string{
  1372. "-A", string(KubeMarkMasqChain),
  1373. "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
  1374. }...)
  1375. }
  1376. // getExistingChains get iptables-save output so we can check for existing chains and rules.
  1377. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
  1378. // Result may SHARE memory with contents of buffer.
  1379. func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptables.Table) map[utiliptables.Chain][]byte {
  1380. buffer.Reset()
  1381. err := proxier.iptables.SaveInto(table, buffer)
  1382. if err != nil { // if we failed to get any rules
  1383. klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
  1384. } else { // otherwise parse the output
  1385. return utiliptables.GetChainLines(table, buffer.Bytes())
  1386. }
  1387. return nil
  1388. }
  1389. // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
  1390. // risk sending more traffic to it, all of which will be lost (because UDP).
  1391. // This assumes the proxier mutex is held
  1392. func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
  1393. for _, epSvcPair := range connectionMap {
  1394. if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP {
  1395. endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
  1396. err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP)
  1397. if err != nil {
  1398. klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
  1399. }
  1400. for _, extIP := range svcInfo.ExternalIPStrings() {
  1401. err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
  1402. if err != nil {
  1403. klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err)
  1404. }
  1405. }
  1406. for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
  1407. err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
  1408. if err != nil {
  1409. klog.Errorf("Failed to delete %s endpoint connections for LoabBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err)
  1410. }
  1411. }
  1412. }
  1413. }
  1414. }
  1415. func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
  1416. appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
  1417. if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
  1418. if appliedVirtualServer == nil {
  1419. // IPVS service is not found, create a new service
  1420. klog.V(3).Infof("Adding new service %q %s:%d/%s", svcName, vs.Address, vs.Port, vs.Protocol)
  1421. if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
  1422. klog.Errorf("Failed to add IPVS service %q: %v", svcName, err)
  1423. return err
  1424. }
  1425. } else {
  1426. // IPVS service was changed, update the existing one
  1427. // During updates, service VIP will not go down
  1428. klog.V(3).Infof("IPVS service %s was changed", svcName)
  1429. if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
  1430. klog.Errorf("Failed to update IPVS service, err:%v", err)
  1431. return err
  1432. }
  1433. }
  1434. }
  1435. // bind service address to dummy interface even if service not changed,
  1436. // in case that service IP was removed by other processes
  1437. if bindAddr {
  1438. klog.V(4).Infof("Bind addr %s", vs.Address.String())
  1439. _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
  1440. if err != nil {
  1441. klog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err)
  1442. return err
  1443. }
  1444. }
  1445. return nil
  1446. }
  1447. func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
  1448. appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
  1449. if err != nil || appliedVirtualServer == nil {
  1450. klog.Errorf("Failed to get IPVS service, error: %v", err)
  1451. return err
  1452. }
  1453. // curEndpoints represents IPVS destinations listed from current system.
  1454. curEndpoints := sets.NewString()
  1455. // newEndpoints represents Endpoints watched from API Server.
  1456. newEndpoints := sets.NewString()
  1457. curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
  1458. if err != nil {
  1459. klog.Errorf("Failed to list IPVS destinations, error: %v", err)
  1460. return err
  1461. }
  1462. for _, des := range curDests {
  1463. curEndpoints.Insert(des.String())
  1464. }
  1465. for _, epInfo := range proxier.endpointsMap[svcPortName] {
  1466. if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() {
  1467. continue
  1468. }
  1469. newEndpoints.Insert(epInfo.String())
  1470. }
  1471. // Create new endpoints
  1472. for _, ep := range newEndpoints.List() {
  1473. ip, port, err := net.SplitHostPort(ep)
  1474. if err != nil {
  1475. klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
  1476. continue
  1477. }
  1478. portNum, err := strconv.Atoi(port)
  1479. if err != nil {
  1480. klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
  1481. continue
  1482. }
  1483. newDest := &utilipvs.RealServer{
  1484. Address: net.ParseIP(ip),
  1485. Port: uint16(portNum),
  1486. Weight: 1,
  1487. }
  1488. if curEndpoints.Has(ep) {
  1489. // check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately
  1490. uniqueRS := GetUniqueRSName(vs, newDest)
  1491. if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
  1492. continue
  1493. }
  1494. klog.V(5).Infof("new ep %q is in graceful delete list", uniqueRS)
  1495. err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
  1496. if err != nil {
  1497. klog.Errorf("Failed to delete endpoint: %v in gracefulDeleteQueue, error: %v", ep, err)
  1498. continue
  1499. }
  1500. }
  1501. err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
  1502. if err != nil {
  1503. klog.Errorf("Failed to add destination: %v, error: %v", newDest, err)
  1504. continue
  1505. }
  1506. }
  1507. // Delete old endpoints
  1508. for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
  1509. // if curEndpoint is in gracefulDelete, skip
  1510. uniqueRS := vs.String() + "/" + ep
  1511. if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
  1512. continue
  1513. }
  1514. ip, port, err := net.SplitHostPort(ep)
  1515. if err != nil {
  1516. klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
  1517. continue
  1518. }
  1519. portNum, err := strconv.Atoi(port)
  1520. if err != nil {
  1521. klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
  1522. continue
  1523. }
  1524. delDest := &utilipvs.RealServer{
  1525. Address: net.ParseIP(ip),
  1526. Port: uint16(portNum),
  1527. }
  1528. klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
  1529. err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
  1530. if err != nil {
  1531. klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
  1532. continue
  1533. }
  1534. }
  1535. return nil
  1536. }
  1537. func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) {
  1538. for cs := range currentServices {
  1539. svc := currentServices[cs]
  1540. if proxier.isIPInExcludeCIDRs(svc.Address) {
  1541. continue
  1542. }
  1543. if _, ok := activeServices[cs]; !ok {
  1544. klog.V(4).Infof("Delete service %s", svc.String())
  1545. if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
  1546. klog.Errorf("Failed to delete service %s, error: %v", svc.String(), err)
  1547. }
  1548. addr := svc.Address.String()
  1549. if _, ok := legacyBindAddrs[addr]; ok {
  1550. klog.V(4).Infof("Unbinding address %s", addr)
  1551. if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil {
  1552. klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
  1553. } else {
  1554. // In case we delete a multi-port service, avoid trying to unbind multiple times
  1555. delete(legacyBindAddrs, addr)
  1556. }
  1557. }
  1558. }
  1559. }
  1560. }
  1561. func (proxier *Proxier) isIPInExcludeCIDRs(ip net.IP) bool {
  1562. // make sure it does not fall within an excluded CIDR range.
  1563. for _, excludedCIDR := range proxier.excludeCIDRs {
  1564. if excludedCIDR.Contains(ip) {
  1565. return true
  1566. }
  1567. }
  1568. return false
  1569. }
  1570. func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) map[string]bool {
  1571. legacyAddrs := make(map[string]bool)
  1572. isIpv6 := utilnet.IsIPv6(proxier.nodeIP)
  1573. for _, addr := range currentBindAddrs {
  1574. addrIsIpv6 := utilnet.IsIPv6(net.ParseIP(addr))
  1575. if addrIsIpv6 && !isIpv6 || !addrIsIpv6 && isIpv6 {
  1576. continue
  1577. }
  1578. if _, ok := activeBindAddrs[addr]; !ok {
  1579. legacyAddrs[addr] = true
  1580. }
  1581. }
  1582. return legacyAddrs
  1583. }
  1584. // Join all words with spaces, terminate with newline and write to buff.
  1585. func writeLine(buf *bytes.Buffer, words ...string) {
  1586. // We avoid strings.Join for performance reasons.
  1587. for i := range words {
  1588. buf.WriteString(words[i])
  1589. if i < len(words)-1 {
  1590. buf.WriteByte(' ')
  1591. } else {
  1592. buf.WriteByte('\n')
  1593. }
  1594. }
  1595. }
  1596. func writeBytesLine(buf *bytes.Buffer, bytes []byte) {
  1597. buf.Write(bytes)
  1598. buf.WriteByte('\n')
  1599. }
  1600. // listenPortOpener opens ports by calling bind() and listen().
  1601. type listenPortOpener struct{}
  1602. // OpenLocalPort holds the given local port open.
  1603. func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
  1604. return openLocalPort(lp)
  1605. }
  1606. func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
  1607. // For ports on node IPs, open the actual port and hold it, even though we
  1608. // use iptables to redirect traffic.
  1609. // This ensures a) that it's safe to use that port and b) that (a) stays
  1610. // true. The risk is that some process on the node (e.g. sshd or kubelet)
  1611. // is using a port and we give that same port out to a Service. That would
  1612. // be bad because iptables would silently claim the traffic but the process
  1613. // would never know.
  1614. // NOTE: We should not need to have a real listen()ing socket - bind()
  1615. // should be enough, but I can't figure out a way to e2e test without
  1616. // it. Tools like 'ss' and 'netstat' do not show sockets that are
  1617. // bind()ed but not listen()ed, and at least the default debian netcat
  1618. // has no way to avoid about 10 seconds of retries.
  1619. var socket utilproxy.Closeable
  1620. switch lp.Protocol {
  1621. case "tcp":
  1622. listener, err := net.Listen("tcp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
  1623. if err != nil {
  1624. return nil, err
  1625. }
  1626. socket = listener
  1627. case "udp":
  1628. addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
  1629. if err != nil {
  1630. return nil, err
  1631. }
  1632. conn, err := net.ListenUDP("udp", addr)
  1633. if err != nil {
  1634. return nil, err
  1635. }
  1636. socket = conn
  1637. default:
  1638. return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
  1639. }
  1640. klog.V(2).Infof("Opened local port %s", lp.String())
  1641. return socket, nil
  1642. }
  1643. // ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets
  1644. // It will only operate iptables *nat table.
  1645. // Create and link the kube postrouting chain for SNAT packets.
  1646. // Chain POSTROUTING (policy ACCEPT)
  1647. // target prot opt source destination
  1648. // KUBE-POSTROUTING all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes postrouting rules *
  1649. // Maintain by kubelet network sync loop
  1650. // *nat
  1651. // :KUBE-POSTROUTING - [0:0]
  1652. // Chain KUBE-POSTROUTING (1 references)
  1653. // target prot opt source destination
  1654. // MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
  1655. // :KUBE-MARK-MASQ - [0:0]
  1656. // Chain KUBE-MARK-MASQ (0 references)
  1657. // target prot opt source destination
  1658. // MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x4000