proxier.go 81 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ipvs
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io"
  18. "io/ioutil"
  19. "net"
  20. "os"
  21. "reflect"
  22. "regexp"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "k8s.io/klog"
  29. utilexec "k8s.io/utils/exec"
  30. utilnet "k8s.io/utils/net"
  31. v1 "k8s.io/api/core/v1"
  32. discovery "k8s.io/api/discovery/v1beta1"
  33. "k8s.io/apimachinery/pkg/types"
  34. "k8s.io/apimachinery/pkg/util/sets"
  35. "k8s.io/apimachinery/pkg/util/version"
  36. "k8s.io/apimachinery/pkg/util/wait"
  37. utilfeature "k8s.io/apiserver/pkg/util/feature"
  38. "k8s.io/client-go/tools/record"
  39. "k8s.io/kubernetes/pkg/features"
  40. "k8s.io/kubernetes/pkg/proxy"
  41. "k8s.io/kubernetes/pkg/proxy/healthcheck"
  42. "k8s.io/kubernetes/pkg/proxy/metaproxier"
  43. "k8s.io/kubernetes/pkg/proxy/metrics"
  44. utilproxy "k8s.io/kubernetes/pkg/proxy/util"
  45. "k8s.io/kubernetes/pkg/util/async"
  46. "k8s.io/kubernetes/pkg/util/conntrack"
  47. utilipset "k8s.io/kubernetes/pkg/util/ipset"
  48. utiliptables "k8s.io/kubernetes/pkg/util/iptables"
  49. utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
  50. utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
  51. )
  52. const (
  53. // kubeServicesChain is the services portal chain
  54. kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
  55. // KubeFireWallChain is the kubernetes firewall chain.
  56. KubeFireWallChain utiliptables.Chain = "KUBE-FIREWALL"
  57. // kubePostroutingChain is the kubernetes postrouting chain
  58. kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
  59. // KubeMarkMasqChain is the mark-for-masquerade chain
  60. KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
  61. // KubeNodePortChain is the kubernetes node port chain
  62. KubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT"
  63. // KubeMarkDropChain is the mark-for-drop chain
  64. KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
  65. // KubeForwardChain is the kubernetes forward chain
  66. KubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
  67. // KubeLoadBalancerChain is the kubernetes chain for loadbalancer type service
  68. KubeLoadBalancerChain utiliptables.Chain = "KUBE-LOAD-BALANCER"
  69. // DefaultScheduler is the default ipvs scheduler algorithm - round robin.
  70. DefaultScheduler = "rr"
  71. // DefaultDummyDevice is the default dummy interface which ipvs service address will bind to it.
  72. DefaultDummyDevice = "kube-ipvs0"
  73. )
  74. // iptablesJumpChain is tables of iptables chains that ipvs proxier used to install iptables or cleanup iptables.
  75. // `to` is the iptables chain we want to operate.
  76. // `from` is the source iptables chain
  77. var iptablesJumpChain = []struct {
  78. table utiliptables.Table
  79. from utiliptables.Chain
  80. to utiliptables.Chain
  81. comment string
  82. }{
  83. {utiliptables.TableNAT, utiliptables.ChainOutput, kubeServicesChain, "kubernetes service portals"},
  84. {utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"},
  85. {utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"},
  86. {utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, "kubernetes forwarding rules"},
  87. }
  88. var iptablesChains = []struct {
  89. table utiliptables.Table
  90. chain utiliptables.Chain
  91. }{
  92. {utiliptables.TableNAT, kubeServicesChain},
  93. {utiliptables.TableNAT, kubePostroutingChain},
  94. {utiliptables.TableNAT, KubeFireWallChain},
  95. {utiliptables.TableNAT, KubeNodePortChain},
  96. {utiliptables.TableNAT, KubeLoadBalancerChain},
  97. {utiliptables.TableNAT, KubeMarkMasqChain},
  98. {utiliptables.TableNAT, KubeMarkDropChain},
  99. {utiliptables.TableFilter, KubeForwardChain},
  100. }
  101. var iptablesCleanupChains = []struct {
  102. table utiliptables.Table
  103. chain utiliptables.Chain
  104. }{
  105. {utiliptables.TableNAT, kubeServicesChain},
  106. {utiliptables.TableNAT, kubePostroutingChain},
  107. {utiliptables.TableNAT, KubeFireWallChain},
  108. {utiliptables.TableNAT, KubeNodePortChain},
  109. {utiliptables.TableNAT, KubeLoadBalancerChain},
  110. {utiliptables.TableFilter, KubeForwardChain},
  111. }
  112. // ipsetInfo is all ipset we needed in ipvs proxier
  113. var ipsetInfo = []struct {
  114. name string
  115. setType utilipset.Type
  116. comment string
  117. }{
  118. {kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
  119. {kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment},
  120. {kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment},
  121. {kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment},
  122. {kubeLoadbalancerFWSet, utilipset.HashIPPort, kubeLoadbalancerFWSetComment},
  123. {kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment},
  124. {kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
  125. {kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
  126. {kubeNodePortSetTCP, utilipset.BitmapPort, kubeNodePortSetTCPComment},
  127. {kubeNodePortLocalSetTCP, utilipset.BitmapPort, kubeNodePortLocalSetTCPComment},
  128. {kubeNodePortSetUDP, utilipset.BitmapPort, kubeNodePortSetUDPComment},
  129. {kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment},
  130. {kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment},
  131. {kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment},
  132. }
  133. // ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to
  134. // `iptables -t nat -A <from> -m set --match-set <name> <matchType> -j <to>`
  135. // example: iptables -t nat -A KUBE-SERVICES -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-NODE-PORT
  136. // ipsets with other match rules will be created Individually.
  137. // Note: kubeNodePortLocalSetTCP must be prior to kubeNodePortSetTCP, the same for UDP.
  138. var ipsetWithIptablesChain = []struct {
  139. name string
  140. from string
  141. to string
  142. matchType string
  143. protocolMatch string
  144. }{
  145. {kubeLoopBackIPSet, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src", ""},
  146. {kubeLoadBalancerSet, string(kubeServicesChain), string(KubeLoadBalancerChain), "dst,dst", ""},
  147. {kubeLoadbalancerFWSet, string(KubeLoadBalancerChain), string(KubeFireWallChain), "dst,dst", ""},
  148. {kubeLoadBalancerSourceCIDRSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""},
  149. {kubeLoadBalancerSourceIPSet, string(KubeFireWallChain), "RETURN", "dst,dst,src", ""},
  150. {kubeLoadBalancerLocalSet, string(KubeLoadBalancerChain), "RETURN", "dst,dst", ""},
  151. {kubeNodePortLocalSetTCP, string(KubeNodePortChain), "RETURN", "dst", "tcp"},
  152. {kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", "tcp"},
  153. {kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", "udp"},
  154. {kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", "udp"},
  155. {kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", "sctp"},
  156. {kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst,dst", "sctp"},
  157. }
  158. // In IPVS proxy mode, the following flags need to be set
  159. const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
  160. const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables"
  161. const sysctlVSConnTrack = "net/ipv4/vs/conntrack"
  162. const sysctlConnReuse = "net/ipv4/vs/conn_reuse_mode"
  163. const sysctlExpireNoDestConn = "net/ipv4/vs/expire_nodest_conn"
  164. const sysctlExpireQuiescentTemplate = "net/ipv4/vs/expire_quiescent_template"
  165. const sysctlForward = "net/ipv4/ip_forward"
  166. const sysctlArpIgnore = "net/ipv4/conf/all/arp_ignore"
  167. const sysctlArpAnnounce = "net/ipv4/conf/all/arp_announce"
  168. // Proxier is an ipvs based proxy for connections between a localhost:lport
  169. // and services that provide the actual backends.
  170. type Proxier struct {
  171. // endpointsChanges and serviceChanges contains all changes to endpoints and
  172. // services that happened since last syncProxyRules call. For a single object,
  173. // changes are accumulated, i.e. previous is state from before all of them,
  174. // current is state after applying all of those.
  175. endpointsChanges *proxy.EndpointChangeTracker
  176. serviceChanges *proxy.ServiceChangeTracker
  177. mu sync.Mutex // protects the following fields
  178. serviceMap proxy.ServiceMap
  179. endpointsMap proxy.EndpointsMap
  180. portsMap map[utilproxy.LocalPort]utilproxy.Closeable
  181. nodeLabels map[string]string
  182. // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when
  183. // corresponding objects are synced after startup. This is used to avoid updating
  184. // ipvs rules with some partial data after kube-proxy restart.
  185. endpointsSynced bool
  186. endpointSlicesSynced bool
  187. servicesSynced bool
  188. initialized int32
  189. syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
  190. // These are effectively const and do not need the mutex to be held.
  191. syncPeriod time.Duration
  192. minSyncPeriod time.Duration
  193. // Values are CIDR's to exclude when cleaning up IPVS rules.
  194. excludeCIDRs []*net.IPNet
  195. // Set to true to set sysctls arp_ignore and arp_announce
  196. strictARP bool
  197. iptables utiliptables.Interface
  198. ipvs utilipvs.Interface
  199. ipset utilipset.Interface
  200. exec utilexec.Interface
  201. masqueradeAll bool
  202. masqueradeMark string
  203. clusterCIDR string
  204. hostname string
  205. nodeIP net.IP
  206. portMapper utilproxy.PortOpener
  207. recorder record.EventRecorder
  208. serviceHealthServer healthcheck.ServiceHealthServer
  209. healthzServer healthcheck.ProxierHealthUpdater
  210. ipvsScheduler string
  211. // Added as a member to the struct to allow injection for testing.
  212. ipGetter IPGetter
  213. // The following buffers are used to reuse memory and avoid allocations
  214. // that are significantly impacting performance.
  215. iptablesData *bytes.Buffer
  216. filterChainsData *bytes.Buffer
  217. natChains *bytes.Buffer
  218. filterChains *bytes.Buffer
  219. natRules *bytes.Buffer
  220. filterRules *bytes.Buffer
  221. // Added as a member to the struct to allow injection for testing.
  222. netlinkHandle NetLinkHandle
  223. // ipsetList is the list of ipsets that ipvs proxier used.
  224. ipsetList map[string]*IPSet
  225. // Values are as a parameter to select the interfaces which nodeport works.
  226. nodePortAddresses []string
  227. // networkInterfacer defines an interface for several net library functions.
  228. // Inject for test purpose.
  229. networkInterfacer utilproxy.NetworkInterfacer
  230. gracefuldeleteManager *GracefulTerminationManager
  231. }
  232. // IPGetter helps get node network interface IP
  233. type IPGetter interface {
  234. NodeIPs() ([]net.IP, error)
  235. }
  236. // realIPGetter is a real NodeIP handler, it implements IPGetter.
  237. type realIPGetter struct {
  238. // nl is a handle for revoking netlink interface
  239. nl NetLinkHandle
  240. }
  241. // NodeIPs returns all LOCAL type IP addresses from host which are taken as the Node IPs of NodePort service.
  242. // It will list source IP exists in local route table with `kernel` protocol type, and filter out IPVS proxier
  243. // created dummy device `kube-ipvs0` For example,
  244. // $ ip route show table local type local proto kernel
  245. // 10.0.0.1 dev kube-ipvs0 scope host src 10.0.0.1
  246. // 10.0.0.10 dev kube-ipvs0 scope host src 10.0.0.10
  247. // 10.0.0.252 dev kube-ipvs0 scope host src 10.0.0.252
  248. // 100.106.89.164 dev eth0 scope host src 100.106.89.164
  249. // 127.0.0.0/8 dev lo scope host src 127.0.0.1
  250. // 127.0.0.1 dev lo scope host src 127.0.0.1
  251. // 172.17.0.1 dev docker0 scope host src 172.17.0.1
  252. // 192.168.122.1 dev virbr0 scope host src 192.168.122.1
  253. // Then filter out dev==kube-ipvs0, and cut the unique src IP fields,
  254. // Node IP set: [100.106.89.164, 127.0.0.1, 192.168.122.1]
  255. func (r *realIPGetter) NodeIPs() (ips []net.IP, err error) {
  256. // Pass in empty filter device name for list all LOCAL type addresses.
  257. nodeAddress, err := r.nl.GetLocalAddresses("", DefaultDummyDevice)
  258. if err != nil {
  259. return nil, fmt.Errorf("error listing LOCAL type addresses from host, error: %v", err)
  260. }
  261. // translate ip string to IP
  262. for _, ipStr := range nodeAddress.UnsortedList() {
  263. ips = append(ips, net.ParseIP(ipStr))
  264. }
  265. return ips, nil
  266. }
  267. // Proxier implements proxy.Provider
  268. var _ proxy.Provider = &Proxier{}
  269. // parseExcludedCIDRs parses the input strings and returns net.IPNet
  270. // The validation has been done earlier so the error condition will never happen under normal conditions
  271. func parseExcludedCIDRs(excludeCIDRs []string) []*net.IPNet {
  272. var cidrExclusions []*net.IPNet
  273. for _, excludedCIDR := range excludeCIDRs {
  274. _, n, err := net.ParseCIDR(excludedCIDR)
  275. if err != nil {
  276. klog.Errorf("Error parsing exclude CIDR %q, err: %v", excludedCIDR, err)
  277. continue
  278. }
  279. cidrExclusions = append(cidrExclusions, n)
  280. }
  281. return cidrExclusions
  282. }
  283. // NewProxier returns a new Proxier given an iptables and ipvs Interface instance.
  284. // Because of the iptables and ipvs logic, it is assumed that there is only a single Proxier active on a machine.
  285. // An error will be returned if it fails to update or acquire the initial lock.
  286. // Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
  287. // will not terminate if a particular iptables or ipvs call fails.
  288. func NewProxier(ipt utiliptables.Interface,
  289. ipvs utilipvs.Interface,
  290. ipset utilipset.Interface,
  291. sysctl utilsysctl.Interface,
  292. exec utilexec.Interface,
  293. syncPeriod time.Duration,
  294. minSyncPeriod time.Duration,
  295. excludeCIDRs []string,
  296. strictARP bool,
  297. tcpTimeout time.Duration,
  298. tcpFinTimeout time.Duration,
  299. udpTimeout time.Duration,
  300. masqueradeAll bool,
  301. masqueradeBit int,
  302. clusterCIDR string,
  303. hostname string,
  304. nodeIP net.IP,
  305. recorder record.EventRecorder,
  306. healthzServer healthcheck.ProxierHealthUpdater,
  307. scheduler string,
  308. nodePortAddresses []string,
  309. ) (*Proxier, error) {
  310. // Set the route_localnet sysctl we need for
  311. if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
  312. if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
  313. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
  314. }
  315. }
  316. // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
  317. // are connected to a Linux bridge (but not SDN bridges). Until most
  318. // plugins handle this, log when config is missing
  319. if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
  320. klog.Infof("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
  321. }
  322. // Set the conntrack sysctl we need for
  323. if val, _ := sysctl.GetSysctl(sysctlVSConnTrack); val != 1 {
  324. if err := sysctl.SetSysctl(sysctlVSConnTrack, 1); err != nil {
  325. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlVSConnTrack, err)
  326. }
  327. }
  328. // Set the connection reuse mode
  329. if val, _ := sysctl.GetSysctl(sysctlConnReuse); val != 0 {
  330. if err := sysctl.SetSysctl(sysctlConnReuse, 0); err != nil {
  331. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlConnReuse, err)
  332. }
  333. }
  334. // Set the expire_nodest_conn sysctl we need for
  335. if val, _ := sysctl.GetSysctl(sysctlExpireNoDestConn); val != 1 {
  336. if err := sysctl.SetSysctl(sysctlExpireNoDestConn, 1); err != nil {
  337. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireNoDestConn, err)
  338. }
  339. }
  340. // Set the expire_quiescent_template sysctl we need for
  341. if val, _ := sysctl.GetSysctl(sysctlExpireQuiescentTemplate); val != 1 {
  342. if err := sysctl.SetSysctl(sysctlExpireQuiescentTemplate, 1); err != nil {
  343. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireQuiescentTemplate, err)
  344. }
  345. }
  346. // Set the ip_forward sysctl we need for
  347. if val, _ := sysctl.GetSysctl(sysctlForward); val != 1 {
  348. if err := sysctl.SetSysctl(sysctlForward, 1); err != nil {
  349. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlForward, err)
  350. }
  351. }
  352. if strictARP {
  353. // Set the arp_ignore sysctl we need for
  354. if val, _ := sysctl.GetSysctl(sysctlArpIgnore); val != 1 {
  355. if err := sysctl.SetSysctl(sysctlArpIgnore, 1); err != nil {
  356. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpIgnore, err)
  357. }
  358. }
  359. // Set the arp_announce sysctl we need for
  360. if val, _ := sysctl.GetSysctl(sysctlArpAnnounce); val != 2 {
  361. if err := sysctl.SetSysctl(sysctlArpAnnounce, 2); err != nil {
  362. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpAnnounce, err)
  363. }
  364. }
  365. }
  366. // Configure IPVS timeouts if any one of the timeout parameters have been set.
  367. // This is the equivalent to running ipvsadm --set, a value of 0 indicates the
  368. // current system timeout should be preserved
  369. if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 {
  370. if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil {
  371. klog.Warningf("failed to configure IPVS timeouts: %v", err)
  372. }
  373. }
  374. // Generate the masquerade mark to use for SNAT rules.
  375. masqueradeValue := 1 << uint(masqueradeBit)
  376. masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
  377. isIPv6 := utilnet.IsIPv6(nodeIP)
  378. klog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)
  379. if len(clusterCIDR) == 0 {
  380. klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
  381. } else if utilnet.IsIPv6CIDRString(clusterCIDR) != isIPv6 {
  382. return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, isIPv6)
  383. }
  384. if len(scheduler) == 0 {
  385. klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler)
  386. scheduler = DefaultScheduler
  387. }
  388. serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
  389. endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying)
  390. proxier := &Proxier{
  391. portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
  392. serviceMap: make(proxy.ServiceMap),
  393. serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
  394. endpointsMap: make(proxy.EndpointsMap),
  395. endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder, endpointSlicesEnabled),
  396. syncPeriod: syncPeriod,
  397. minSyncPeriod: minSyncPeriod,
  398. excludeCIDRs: parseExcludedCIDRs(excludeCIDRs),
  399. iptables: ipt,
  400. masqueradeAll: masqueradeAll,
  401. masqueradeMark: masqueradeMark,
  402. exec: exec,
  403. clusterCIDR: clusterCIDR,
  404. hostname: hostname,
  405. nodeIP: nodeIP,
  406. portMapper: &listenPortOpener{},
  407. recorder: recorder,
  408. serviceHealthServer: serviceHealthServer,
  409. healthzServer: healthzServer,
  410. ipvs: ipvs,
  411. ipvsScheduler: scheduler,
  412. ipGetter: &realIPGetter{nl: NewNetLinkHandle(isIPv6)},
  413. iptablesData: bytes.NewBuffer(nil),
  414. filterChainsData: bytes.NewBuffer(nil),
  415. natChains: bytes.NewBuffer(nil),
  416. natRules: bytes.NewBuffer(nil),
  417. filterChains: bytes.NewBuffer(nil),
  418. filterRules: bytes.NewBuffer(nil),
  419. netlinkHandle: NewNetLinkHandle(isIPv6),
  420. ipset: ipset,
  421. nodePortAddresses: nodePortAddresses,
  422. networkInterfacer: utilproxy.RealNetwork{},
  423. gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
  424. }
  425. // initialize ipsetList with all sets we needed
  426. proxier.ipsetList = make(map[string]*IPSet)
  427. for _, is := range ipsetInfo {
  428. proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
  429. }
  430. burstSyncs := 2
  431. klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
  432. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
  433. proxier.gracefuldeleteManager.Run()
  434. return proxier, nil
  435. }
  436. // NewDualStackProxier returns a new Proxier for dual-stack operation
  437. func NewDualStackProxier(
  438. ipt [2]utiliptables.Interface,
  439. ipvs utilipvs.Interface,
  440. ipset utilipset.Interface,
  441. sysctl utilsysctl.Interface,
  442. exec utilexec.Interface,
  443. syncPeriod time.Duration,
  444. minSyncPeriod time.Duration,
  445. excludeCIDRs []string,
  446. strictARP bool,
  447. tcpTimeout time.Duration,
  448. tcpFinTimeout time.Duration,
  449. udpTimeout time.Duration,
  450. masqueradeAll bool,
  451. masqueradeBit int,
  452. clusterCIDR [2]string,
  453. hostname string,
  454. nodeIP [2]net.IP,
  455. recorder record.EventRecorder,
  456. healthzServer healthcheck.ProxierHealthUpdater,
  457. scheduler string,
  458. nodePortAddresses []string,
  459. ) (proxy.Provider, error) {
  460. safeIpset := newSafeIpset(ipset)
  461. // Create an ipv4 instance of the single-stack proxier
  462. ipv4Proxier, err := NewProxier(ipt[0], ipvs, safeIpset, sysctl,
  463. exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP,
  464. tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
  465. clusterCIDR[0], hostname, nodeIP[0],
  466. recorder, healthzServer, scheduler, nodePortAddresses)
  467. if err != nil {
  468. return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
  469. }
  470. ipv6Proxier, err := NewProxier(ipt[1], ipvs, safeIpset, sysctl,
  471. exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP,
  472. tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
  473. clusterCIDR[1], hostname, nodeIP[1],
  474. nil, nil, scheduler, nodePortAddresses)
  475. if err != nil {
  476. return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
  477. }
  478. // Return a meta-proxier that dispatch calls between the two
  479. // single-stack proxier instances
  480. return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
  481. }
  482. func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
  483. var filteredCIDRs []string
  484. for _, cidr := range cidrs {
  485. if utilnet.IsIPv6CIDRString(cidr) == wantIPv6 {
  486. filteredCIDRs = append(filteredCIDRs, cidr)
  487. }
  488. }
  489. return filteredCIDRs
  490. }
  491. // internal struct for string service information
  492. type serviceInfo struct {
  493. *proxy.BaseServiceInfo
  494. // The following fields are computed and stored for performance reasons.
  495. serviceNameString string
  496. }
  497. // returns a new proxy.ServicePort which abstracts a serviceInfo
  498. func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
  499. info := &serviceInfo{BaseServiceInfo: baseInfo}
  500. // Store the following for performance reasons.
  501. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  502. svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
  503. info.serviceNameString = svcPortName.String()
  504. return info
  505. }
  506. // KernelHandler can handle the current installed kernel modules.
  507. type KernelHandler interface {
  508. GetModules() ([]string, error)
  509. GetKernelVersion() (string, error)
  510. }
  511. // LinuxKernelHandler implements KernelHandler interface.
  512. type LinuxKernelHandler struct {
  513. executor utilexec.Interface
  514. }
  515. // NewLinuxKernelHandler initializes LinuxKernelHandler with exec.
  516. func NewLinuxKernelHandler() *LinuxKernelHandler {
  517. return &LinuxKernelHandler{
  518. executor: utilexec.New(),
  519. }
  520. }
  521. // GetModules returns all installed kernel modules.
  522. func (handle *LinuxKernelHandler) GetModules() ([]string, error) {
  523. // Check whether IPVS required kernel modules are built-in
  524. kernelVersionStr, err := handle.GetKernelVersion()
  525. if err != nil {
  526. return nil, err
  527. }
  528. kernelVersion, err := version.ParseGeneric(kernelVersionStr)
  529. if err != nil {
  530. return nil, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err)
  531. }
  532. ipvsModules := utilipvs.GetRequiredIPVSModules(kernelVersion)
  533. var bmods []string
  534. // Find out loaded kernel modules. If this is a full static kernel it will try to verify if the module is compiled using /boot/config-KERNELVERSION
  535. modulesFile, err := os.Open("/proc/modules")
  536. if err == os.ErrNotExist {
  537. klog.Warningf("Failed to read file /proc/modules with error %v. Assuming this is a kernel without loadable modules support enabled", err)
  538. kernelConfigFile := fmt.Sprintf("/boot/config-%s", kernelVersionStr)
  539. kConfig, err := ioutil.ReadFile(kernelConfigFile)
  540. if err != nil {
  541. return nil, fmt.Errorf("Failed to read Kernel Config file %s with error %v", kernelConfigFile, err)
  542. }
  543. for _, module := range ipvsModules {
  544. if match, _ := regexp.Match("CONFIG_"+strings.ToUpper(module)+"=y", kConfig); match {
  545. bmods = append(bmods, module)
  546. }
  547. }
  548. return bmods, nil
  549. }
  550. if err != nil {
  551. return nil, fmt.Errorf("Failed to read file /proc/modules with error %v", err)
  552. }
  553. mods, err := getFirstColumn(modulesFile)
  554. if err != nil {
  555. return nil, fmt.Errorf("failed to find loaded kernel modules: %v", err)
  556. }
  557. builtinModsFilePath := fmt.Sprintf("/lib/modules/%s/modules.builtin", kernelVersionStr)
  558. b, err := ioutil.ReadFile(builtinModsFilePath)
  559. if err != nil {
  560. klog.Warningf("Failed to read file %s with error %v. You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", builtinModsFilePath, err)
  561. }
  562. for _, module := range ipvsModules {
  563. if match, _ := regexp.Match(module+".ko", b); match {
  564. bmods = append(bmods, module)
  565. } else {
  566. // Try to load the required IPVS kernel modules if not built in
  567. err := handle.executor.Command("modprobe", "--", module).Run()
  568. if err != nil {
  569. klog.Warningf("Failed to load kernel module %v with modprobe. "+
  570. "You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", module)
  571. }
  572. }
  573. }
  574. return append(mods, bmods...), nil
  575. }
  576. // getFirstColumn reads all the content from r into memory and return a
  577. // slice which consists of the first word from each line.
  578. func getFirstColumn(r io.Reader) ([]string, error) {
  579. b, err := ioutil.ReadAll(r)
  580. if err != nil {
  581. return nil, err
  582. }
  583. lines := strings.Split(string(b), "\n")
  584. words := make([]string, 0, len(lines))
  585. for i := range lines {
  586. fields := strings.Fields(lines[i])
  587. if len(fields) > 0 {
  588. words = append(words, fields[0])
  589. }
  590. }
  591. return words, nil
  592. }
  593. // GetKernelVersion returns currently running kernel version.
  594. func (handle *LinuxKernelHandler) GetKernelVersion() (string, error) {
  595. kernelVersionFile := "/proc/sys/kernel/osrelease"
  596. fileContent, err := ioutil.ReadFile(kernelVersionFile)
  597. if err != nil {
  598. return "", fmt.Errorf("error reading osrelease file %q: %v", kernelVersionFile, err)
  599. }
  600. return strings.TrimSpace(string(fileContent)), nil
  601. }
  602. // CanUseIPVSProxier returns true if we can use the ipvs Proxier.
  603. // This is determined by checking if all the required kernel modules can be loaded. It may
  604. // return an error if it fails to get the kernel modules information without error, in which
  605. // case it will also return false.
  606. func CanUseIPVSProxier(handle KernelHandler, ipsetver IPSetVersioner) (bool, error) {
  607. mods, err := handle.GetModules()
  608. if err != nil {
  609. return false, fmt.Errorf("error getting installed ipvs required kernel modules: %v", err)
  610. }
  611. loadModules := sets.NewString()
  612. loadModules.Insert(mods...)
  613. kernelVersionStr, err := handle.GetKernelVersion()
  614. if err != nil {
  615. return false, fmt.Errorf("error determining kernel version to find required kernel modules for ipvs support: %v", err)
  616. }
  617. kernelVersion, err := version.ParseGeneric(kernelVersionStr)
  618. if err != nil {
  619. return false, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err)
  620. }
  621. mods = utilipvs.GetRequiredIPVSModules(kernelVersion)
  622. wantModules := sets.NewString()
  623. wantModules.Insert(mods...)
  624. modules := wantModules.Difference(loadModules).UnsortedList()
  625. var missingMods []string
  626. ConntrackiMissingCounter := 0
  627. for _, mod := range modules {
  628. if strings.Contains(mod, "nf_conntrack") {
  629. ConntrackiMissingCounter++
  630. } else {
  631. missingMods = append(missingMods, mod)
  632. }
  633. }
  634. if ConntrackiMissingCounter == 2 {
  635. missingMods = append(missingMods, "nf_conntrack_ipv4(or nf_conntrack for Linux kernel 4.19 and later)")
  636. }
  637. if len(missingMods) != 0 {
  638. return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", missingMods)
  639. }
  640. // Check ipset version
  641. versionString, err := ipsetver.GetVersion()
  642. if err != nil {
  643. return false, fmt.Errorf("error getting ipset version, error: %v", err)
  644. }
  645. if !checkMinVersion(versionString) {
  646. return false, fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion)
  647. }
  648. return true, nil
  649. }
  650. // CleanupIptablesLeftovers removes all iptables rules and chains created by the Proxier
  651. // It returns true if an error was encountered. Errors are logged.
  652. func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
  653. // Unlink the iptables chains created by ipvs Proxier
  654. for _, jc := range iptablesJumpChain {
  655. args := []string{
  656. "-m", "comment", "--comment", jc.comment,
  657. "-j", string(jc.to),
  658. }
  659. if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil {
  660. if !utiliptables.IsNotFoundError(err) {
  661. klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
  662. encounteredError = true
  663. }
  664. }
  665. }
  666. // Flush and remove all of our chains. Flushing all chains before removing them also removes all links between chains first.
  667. for _, ch := range iptablesCleanupChains {
  668. if err := ipt.FlushChain(ch.table, ch.chain); err != nil {
  669. if !utiliptables.IsNotFoundError(err) {
  670. klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
  671. encounteredError = true
  672. }
  673. }
  674. }
  675. // Remove all of our chains.
  676. for _, ch := range iptablesCleanupChains {
  677. if err := ipt.DeleteChain(ch.table, ch.chain); err != nil {
  678. if !utiliptables.IsNotFoundError(err) {
  679. klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
  680. encounteredError = true
  681. }
  682. }
  683. }
  684. return encounteredError
  685. }
  686. // CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier.
  687. func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface, cleanupIPVS bool) (encounteredError bool) {
  688. if cleanupIPVS {
  689. // Return immediately when ipvs interface is nil - Probably initialization failed in somewhere.
  690. if ipvs == nil {
  691. return true
  692. }
  693. encounteredError = false
  694. err := ipvs.Flush()
  695. if err != nil {
  696. klog.Errorf("Error flushing IPVS rules: %v", err)
  697. encounteredError = true
  698. }
  699. }
  700. // Delete dummy interface created by ipvs Proxier.
  701. nl := NewNetLinkHandle(false)
  702. err := nl.DeleteDummyDevice(DefaultDummyDevice)
  703. if err != nil {
  704. klog.Errorf("Error deleting dummy device %s created by IPVS proxier: %v", DefaultDummyDevice, err)
  705. encounteredError = true
  706. }
  707. // Clear iptables created by ipvs Proxier.
  708. encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError
  709. // Destroy ip sets created by ipvs Proxier. We should call it after cleaning up
  710. // iptables since we can NOT delete ip set which is still referenced by iptables.
  711. for _, set := range ipsetInfo {
  712. err = ipset.DestroySet(set.name)
  713. if err != nil {
  714. if !utilipset.IsNotFoundError(err) {
  715. klog.Errorf("Error removing ipset %s, error: %v", set.name, err)
  716. encounteredError = true
  717. }
  718. }
  719. }
  720. return encounteredError
  721. }
  722. // Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
  723. func (proxier *Proxier) Sync() {
  724. if proxier.healthzServer != nil {
  725. proxier.healthzServer.QueuedUpdate()
  726. }
  727. proxier.syncRunner.Run()
  728. }
  729. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
  730. func (proxier *Proxier) SyncLoop() {
  731. // Update healthz timestamp at beginning in case Sync() never succeeds.
  732. if proxier.healthzServer != nil {
  733. proxier.healthzServer.Updated()
  734. }
  735. proxier.syncRunner.Loop(wait.NeverStop)
  736. }
  737. func (proxier *Proxier) setInitialized(value bool) {
  738. var initialized int32
  739. if value {
  740. initialized = 1
  741. }
  742. atomic.StoreInt32(&proxier.initialized, initialized)
  743. }
  744. func (proxier *Proxier) isInitialized() bool {
  745. return atomic.LoadInt32(&proxier.initialized) > 0
  746. }
  747. // OnServiceAdd is called whenever creation of new service object is observed.
  748. func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  749. proxier.OnServiceUpdate(nil, service)
  750. }
  751. // OnServiceUpdate is called whenever modification of an existing service object is observed.
  752. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  753. if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
  754. proxier.Sync()
  755. }
  756. }
  757. // OnServiceDelete is called whenever deletion of an existing service object is observed.
  758. func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  759. proxier.OnServiceUpdate(service, nil)
  760. }
  761. // OnServiceSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
  762. func (proxier *Proxier) OnServiceSynced() {
  763. proxier.mu.Lock()
  764. proxier.servicesSynced = true
  765. if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
  766. proxier.setInitialized(proxier.endpointSlicesSynced)
  767. } else {
  768. proxier.setInitialized(proxier.endpointsSynced)
  769. }
  770. proxier.mu.Unlock()
  771. // Sync unconditionally - this is called once per lifetime.
  772. proxier.syncProxyRules()
  773. }
  774. // OnEndpointsAdd is called whenever creation of new endpoints object is observed.
  775. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
  776. proxier.OnEndpointsUpdate(nil, endpoints)
  777. }
  778. // OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
  779. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  780. if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
  781. proxier.Sync()
  782. }
  783. }
  784. // OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
  785. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
  786. proxier.OnEndpointsUpdate(endpoints, nil)
  787. }
  788. // OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
  789. func (proxier *Proxier) OnEndpointsSynced() {
  790. proxier.mu.Lock()
  791. proxier.endpointsSynced = true
  792. proxier.setInitialized(proxier.servicesSynced)
  793. proxier.mu.Unlock()
  794. // Sync unconditionally - this is called once per lifetime.
  795. proxier.syncProxyRules()
  796. }
  797. // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
  798. // is observed.
  799. func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
  800. if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
  801. proxier.Sync()
  802. }
  803. }
  804. // OnEndpointSliceUpdate is called whenever modification of an existing endpoint
  805. // slice object is observed.
  806. func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
  807. if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
  808. proxier.Sync()
  809. }
  810. }
  811. // OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
  812. // object is observed.
  813. func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
  814. if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
  815. proxier.Sync()
  816. }
  817. }
  818. // OnEndpointSlicesSynced is called once all the initial event handlers were
  819. // called and the state is fully propagated to local cache.
  820. func (proxier *Proxier) OnEndpointSlicesSynced() {
  821. proxier.mu.Lock()
  822. proxier.endpointSlicesSynced = true
  823. proxier.setInitialized(proxier.servicesSynced)
  824. proxier.mu.Unlock()
  825. // Sync unconditionally - this is called once per lifetime.
  826. proxier.syncProxyRules()
  827. }
  828. // OnNodeAdd is called whenever creation of new node object
  829. // is observed.
  830. func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
  831. if node.Name != proxier.hostname {
  832. klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
  833. return
  834. }
  835. if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
  836. return
  837. }
  838. proxier.mu.Lock()
  839. proxier.nodeLabels = node.Labels
  840. proxier.mu.Unlock()
  841. proxier.syncProxyRules()
  842. }
  843. // OnNodeUpdate is called whenever modification of an existing
  844. // node object is observed.
  845. func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
  846. if node.Name != proxier.hostname {
  847. klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
  848. return
  849. }
  850. if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
  851. return
  852. }
  853. proxier.mu.Lock()
  854. proxier.nodeLabels = node.Labels
  855. proxier.mu.Unlock()
  856. proxier.syncProxyRules()
  857. }
  858. // OnNodeDelete is called whever deletion of an existing node
  859. // object is observed.
  860. func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
  861. if node.Name != proxier.hostname {
  862. klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
  863. return
  864. }
  865. proxier.mu.Lock()
  866. proxier.nodeLabels = nil
  867. proxier.mu.Unlock()
  868. proxier.syncProxyRules()
  869. }
  870. // OnNodeSynced is called once all the initial event handlers were
  871. // called and the state is fully propagated to local cache.
  872. func (proxier *Proxier) OnNodeSynced() {
  873. }
  874. // EntryInvalidErr indicates if an ipset entry is invalid or not
  875. const EntryInvalidErr = "error adding entry %s to ipset %s"
  876. func getLocalAddrs() ([]net.IP, error) {
  877. var localAddrs []net.IP
  878. addrs, err := net.InterfaceAddrs()
  879. if err != nil {
  880. return nil, err
  881. }
  882. for _, addr := range addrs {
  883. ip, _, err := net.ParseCIDR(addr.String())
  884. if err != nil {
  885. return nil, err
  886. }
  887. localAddrs = append(localAddrs, ip)
  888. }
  889. return localAddrs, nil
  890. }
  891. func ipExists(ip net.IP, addrs []net.IP) bool {
  892. for _, addr := range addrs {
  893. if ip.Equal(addr) {
  894. return true
  895. }
  896. }
  897. return false
  898. }
  899. // This is where all of the ipvs calls happen.
  900. // assumes proxier.mu is held
  901. func (proxier *Proxier) syncProxyRules() {
  902. proxier.mu.Lock()
  903. defer proxier.mu.Unlock()
  904. // don't sync rules till we've received services and endpoints
  905. if !proxier.isInitialized() {
  906. klog.V(2).Info("Not syncing ipvs rules until Services and Endpoints have been received from master")
  907. return
  908. }
  909. // Keep track of how long syncs take.
  910. start := time.Now()
  911. defer func() {
  912. metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
  913. klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
  914. }()
  915. localAddrs, err := getLocalAddrs()
  916. if err != nil {
  917. klog.Errorf("Failed to get local addresses during proxy sync: %v", err)
  918. }
  919. // We assume that if this was called, we really want to sync them,
  920. // even if nothing changed in the meantime. In other words, callers are
  921. // responsible for detecting no-op changes and not calling this function.
  922. serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
  923. endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
  924. staleServices := serviceUpdateResult.UDPStaleClusterIP
  925. // merge stale services gathered from updateEndpointsMap
  926. for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
  927. if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
  928. klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String())
  929. staleServices.Insert(svcInfo.ClusterIP().String())
  930. for _, extIP := range svcInfo.ExternalIPStrings() {
  931. staleServices.Insert(extIP)
  932. }
  933. }
  934. }
  935. klog.V(3).Infof("Syncing ipvs Proxier rules")
  936. // Begin install iptables
  937. // Reset all buffers used later.
  938. // This is to avoid memory reallocations and thus improve performance.
  939. proxier.natChains.Reset()
  940. proxier.natRules.Reset()
  941. proxier.filterChains.Reset()
  942. proxier.filterRules.Reset()
  943. // Write table headers.
  944. writeLine(proxier.filterChains, "*filter")
  945. writeLine(proxier.natChains, "*nat")
  946. proxier.createAndLinkeKubeChain()
  947. // make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
  948. _, err = proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  949. if err != nil {
  950. klog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
  951. return
  952. }
  953. // make sure ip sets exists in the system.
  954. for _, set := range proxier.ipsetList {
  955. if err := ensureIPSet(set); err != nil {
  956. return
  957. }
  958. set.resetEntries()
  959. }
  960. // Accumulate the set of local ports that we will be holding open once this update is complete
  961. replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
  962. // activeIPVSServices represents IPVS service successfully created in this round of sync
  963. activeIPVSServices := map[string]bool{}
  964. // currentIPVSServices represent IPVS services listed from the system
  965. currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
  966. // activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync
  967. activeBindAddrs := map[string]bool{}
  968. hasNodePort := false
  969. for _, svc := range proxier.serviceMap {
  970. svcInfo, ok := svc.(*serviceInfo)
  971. if ok && svcInfo.NodePort() != 0 {
  972. hasNodePort = true
  973. break
  974. }
  975. }
  976. // Both nodeAddresses and nodeIPs can be reused for all nodePort services
  977. // and only need to be computed if we have at least one nodePort service.
  978. var (
  979. // List of node addresses to listen on if a nodePort is set.
  980. nodeAddresses []string
  981. // List of node IP addresses to be used as IPVS services if nodePort is set.
  982. nodeIPs []net.IP
  983. )
  984. if hasNodePort {
  985. nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
  986. if err != nil {
  987. klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
  988. }
  989. if err == nil && nodeAddrSet.Len() > 0 {
  990. nodeAddresses = nodeAddrSet.List()
  991. for _, address := range nodeAddresses {
  992. if utilproxy.IsZeroCIDR(address) {
  993. nodeIPs, err = proxier.ipGetter.NodeIPs()
  994. if err != nil {
  995. klog.Errorf("Failed to list all node IPs from host, err: %v", err)
  996. }
  997. break
  998. }
  999. nodeIPs = append(nodeIPs, net.ParseIP(address))
  1000. }
  1001. }
  1002. }
  1003. // Build IPVS rules for each service.
  1004. for svcName, svc := range proxier.serviceMap {
  1005. svcInfo, ok := svc.(*serviceInfo)
  1006. if !ok {
  1007. klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
  1008. continue
  1009. }
  1010. protocol := strings.ToLower(string(svcInfo.Protocol()))
  1011. // Precompute svcNameString; with many services the many calls
  1012. // to ServicePortName.String() show up in CPU profiles.
  1013. svcNameString := svcName.String()
  1014. // Handle traffic that loops back to the originator with SNAT.
  1015. for _, e := range proxier.endpointsMap[svcName] {
  1016. ep, ok := e.(*proxy.BaseEndpointInfo)
  1017. if !ok {
  1018. klog.Errorf("Failed to cast BaseEndpointInfo %q", e.String())
  1019. continue
  1020. }
  1021. if !ep.IsLocal {
  1022. continue
  1023. }
  1024. epIP := ep.IP()
  1025. epPort, err := ep.Port()
  1026. // Error parsing this endpoint has been logged. Skip to next endpoint.
  1027. if epIP == "" || err != nil {
  1028. continue
  1029. }
  1030. entry := &utilipset.Entry{
  1031. IP: epIP,
  1032. Port: epPort,
  1033. Protocol: protocol,
  1034. IP2: epIP,
  1035. SetType: utilipset.HashIPPortIP,
  1036. }
  1037. if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
  1038. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name))
  1039. continue
  1040. }
  1041. proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
  1042. }
  1043. // Capture the clusterIP.
  1044. // ipset call
  1045. entry := &utilipset.Entry{
  1046. IP: svcInfo.ClusterIP().String(),
  1047. Port: svcInfo.Port(),
  1048. Protocol: protocol,
  1049. SetType: utilipset.HashIPPort,
  1050. }
  1051. // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
  1052. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
  1053. if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
  1054. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name))
  1055. continue
  1056. }
  1057. proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
  1058. // ipvs call
  1059. serv := &utilipvs.VirtualServer{
  1060. Address: svcInfo.ClusterIP(),
  1061. Port: uint16(svcInfo.Port()),
  1062. Protocol: string(svcInfo.Protocol()),
  1063. Scheduler: proxier.ipvsScheduler,
  1064. }
  1065. // Set session affinity flag and timeout for IPVS service
  1066. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1067. serv.Flags |= utilipvs.FlagPersistent
  1068. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1069. }
  1070. // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
  1071. if err := proxier.syncService(svcNameString, serv, true); err == nil {
  1072. activeIPVSServices[serv.String()] = true
  1073. activeBindAddrs[serv.Address.String()] = true
  1074. // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
  1075. // So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
  1076. if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
  1077. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  1078. }
  1079. } else {
  1080. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  1081. }
  1082. // Capture externalIPs.
  1083. for _, externalIP := range svcInfo.ExternalIPStrings() {
  1084. if len(localAddrs) == 0 {
  1085. klog.Errorf("couldn't find any local IPs, assuming %s is not local", externalIP)
  1086. } else if (svcInfo.Protocol() != v1.ProtocolSCTP) && ipExists(net.ParseIP(externalIP), localAddrs) {
  1087. // We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP
  1088. lp := utilproxy.LocalPort{
  1089. Description: "externalIP for " + svcNameString,
  1090. IP: externalIP,
  1091. Port: svcInfo.Port(),
  1092. Protocol: protocol,
  1093. }
  1094. if proxier.portsMap[lp] != nil {
  1095. klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
  1096. replacementPortsMap[lp] = proxier.portsMap[lp]
  1097. } else {
  1098. socket, err := proxier.portMapper.OpenLocalPort(&lp)
  1099. if err != nil {
  1100. msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
  1101. proxier.recorder.Eventf(
  1102. &v1.ObjectReference{
  1103. Kind: "Node",
  1104. Name: proxier.hostname,
  1105. UID: types.UID(proxier.hostname),
  1106. Namespace: "",
  1107. }, v1.EventTypeWarning, err.Error(), msg)
  1108. klog.Error(msg)
  1109. continue
  1110. }
  1111. replacementPortsMap[lp] = socket
  1112. }
  1113. } // We're holding the port, so it's OK to install IPVS rules.
  1114. // ipset call
  1115. entry := &utilipset.Entry{
  1116. IP: externalIP,
  1117. Port: svcInfo.Port(),
  1118. Protocol: protocol,
  1119. SetType: utilipset.HashIPPort,
  1120. }
  1121. // We have to SNAT packets to external IPs.
  1122. if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
  1123. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name))
  1124. continue
  1125. }
  1126. proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
  1127. // ipvs call
  1128. serv := &utilipvs.VirtualServer{
  1129. Address: net.ParseIP(externalIP),
  1130. Port: uint16(svcInfo.Port()),
  1131. Protocol: string(svcInfo.Protocol()),
  1132. Scheduler: proxier.ipvsScheduler,
  1133. }
  1134. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1135. serv.Flags |= utilipvs.FlagPersistent
  1136. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1137. }
  1138. if err := proxier.syncService(svcNameString, serv, true); err == nil {
  1139. activeIPVSServices[serv.String()] = true
  1140. activeBindAddrs[serv.Address.String()] = true
  1141. if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
  1142. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  1143. }
  1144. } else {
  1145. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  1146. }
  1147. }
  1148. // Capture load-balancer ingress.
  1149. for _, ingress := range svcInfo.LoadBalancerIPStrings() {
  1150. if ingress != "" {
  1151. // ipset call
  1152. entry = &utilipset.Entry{
  1153. IP: ingress,
  1154. Port: svcInfo.Port(),
  1155. Protocol: protocol,
  1156. SetType: utilipset.HashIPPort,
  1157. }
  1158. // add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
  1159. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
  1160. // If we are proxying globally, we need to masquerade in case we cross nodes.
  1161. // If we are proxying only locally, we can retain the source IP.
  1162. if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
  1163. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSet].Name))
  1164. continue
  1165. }
  1166. proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
  1167. // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
  1168. if svcInfo.OnlyNodeLocalEndpoints() {
  1169. if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
  1170. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name))
  1171. continue
  1172. }
  1173. proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
  1174. }
  1175. if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
  1176. // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
  1177. // This currently works for loadbalancers that preserves source ips.
  1178. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
  1179. if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
  1180. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadbalancerFWSet].Name))
  1181. continue
  1182. }
  1183. proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
  1184. allowFromNode := false
  1185. for _, src := range svcInfo.LoadBalancerSourceRanges() {
  1186. // ipset call
  1187. entry = &utilipset.Entry{
  1188. IP: ingress,
  1189. Port: svcInfo.Port(),
  1190. Protocol: protocol,
  1191. Net: src,
  1192. SetType: utilipset.HashIPPortNet,
  1193. }
  1194. // enumerate all white list source cidr
  1195. if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
  1196. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name))
  1197. continue
  1198. }
  1199. proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
  1200. // ignore error because it has been validated
  1201. _, cidr, _ := net.ParseCIDR(src)
  1202. if cidr.Contains(proxier.nodeIP) {
  1203. allowFromNode = true
  1204. }
  1205. }
  1206. // generally, ip route rule was added to intercept request to loadbalancer vip from the
  1207. // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
  1208. // Need to add the following rule to allow request on host.
  1209. if allowFromNode {
  1210. entry = &utilipset.Entry{
  1211. IP: ingress,
  1212. Port: svcInfo.Port(),
  1213. Protocol: protocol,
  1214. IP2: ingress,
  1215. SetType: utilipset.HashIPPortIP,
  1216. }
  1217. // enumerate all white list source ip
  1218. if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
  1219. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name))
  1220. continue
  1221. }
  1222. proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
  1223. }
  1224. }
  1225. // ipvs call
  1226. serv := &utilipvs.VirtualServer{
  1227. Address: net.ParseIP(ingress),
  1228. Port: uint16(svcInfo.Port()),
  1229. Protocol: string(svcInfo.Protocol()),
  1230. Scheduler: proxier.ipvsScheduler,
  1231. }
  1232. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1233. serv.Flags |= utilipvs.FlagPersistent
  1234. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1235. }
  1236. if err := proxier.syncService(svcNameString, serv, true); err == nil {
  1237. activeIPVSServices[serv.String()] = true
  1238. activeBindAddrs[serv.Address.String()] = true
  1239. if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
  1240. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  1241. }
  1242. } else {
  1243. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  1244. }
  1245. }
  1246. }
  1247. if svcInfo.NodePort() != 0 {
  1248. if len(nodeAddresses) == 0 || len(nodeIPs) == 0 {
  1249. // Skip nodePort configuration since an error occurred when
  1250. // computing nodeAddresses or nodeIPs.
  1251. continue
  1252. }
  1253. var lps []utilproxy.LocalPort
  1254. for _, address := range nodeAddresses {
  1255. lp := utilproxy.LocalPort{
  1256. Description: "nodePort for " + svcNameString,
  1257. IP: address,
  1258. Port: svcInfo.NodePort(),
  1259. Protocol: protocol,
  1260. }
  1261. if utilproxy.IsZeroCIDR(address) {
  1262. // Empty IP address means all
  1263. lp.IP = ""
  1264. lps = append(lps, lp)
  1265. // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
  1266. break
  1267. }
  1268. lps = append(lps, lp)
  1269. }
  1270. // For ports on node IPs, open the actual port and hold it.
  1271. for _, lp := range lps {
  1272. if proxier.portsMap[lp] != nil {
  1273. klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
  1274. replacementPortsMap[lp] = proxier.portsMap[lp]
  1275. // We do not start listening on SCTP ports, according to our agreement in the
  1276. // SCTP support KEP
  1277. } else if svcInfo.Protocol() != v1.ProtocolSCTP {
  1278. socket, err := proxier.portMapper.OpenLocalPort(&lp)
  1279. if err != nil {
  1280. klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
  1281. continue
  1282. }
  1283. if lp.Protocol == "udp" {
  1284. isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
  1285. conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
  1286. }
  1287. replacementPortsMap[lp] = socket
  1288. } // We're holding the port, so it's OK to install ipvs rules.
  1289. }
  1290. // Nodeports need SNAT, unless they're local.
  1291. // ipset call
  1292. var (
  1293. nodePortSet *IPSet
  1294. entries []*utilipset.Entry
  1295. )
  1296. switch protocol {
  1297. case "tcp":
  1298. nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
  1299. entries = []*utilipset.Entry{{
  1300. // No need to provide ip info
  1301. Port: svcInfo.NodePort(),
  1302. Protocol: protocol,
  1303. SetType: utilipset.BitmapPort,
  1304. }}
  1305. case "udp":
  1306. nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
  1307. entries = []*utilipset.Entry{{
  1308. // No need to provide ip info
  1309. Port: svcInfo.NodePort(),
  1310. Protocol: protocol,
  1311. SetType: utilipset.BitmapPort,
  1312. }}
  1313. case "sctp":
  1314. nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
  1315. // Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
  1316. entries = []*utilipset.Entry{}
  1317. for _, nodeIP := range nodeIPs {
  1318. entries = append(entries, &utilipset.Entry{
  1319. IP: nodeIP.String(),
  1320. Port: svcInfo.NodePort(),
  1321. Protocol: protocol,
  1322. SetType: utilipset.HashIPPort,
  1323. })
  1324. }
  1325. default:
  1326. // It should never hit
  1327. klog.Errorf("Unsupported protocol type: %s", protocol)
  1328. }
  1329. if nodePortSet != nil {
  1330. entryInvalidErr := false
  1331. for _, entry := range entries {
  1332. if valid := nodePortSet.validateEntry(entry); !valid {
  1333. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
  1334. entryInvalidErr = true
  1335. break
  1336. }
  1337. nodePortSet.activeEntries.Insert(entry.String())
  1338. }
  1339. if entryInvalidErr {
  1340. continue
  1341. }
  1342. }
  1343. // Add externaltrafficpolicy=local type nodeport entry
  1344. if svcInfo.OnlyNodeLocalEndpoints() {
  1345. var nodePortLocalSet *IPSet
  1346. switch protocol {
  1347. case "tcp":
  1348. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
  1349. case "udp":
  1350. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
  1351. case "sctp":
  1352. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
  1353. default:
  1354. // It should never hit
  1355. klog.Errorf("Unsupported protocol type: %s", protocol)
  1356. }
  1357. if nodePortLocalSet != nil {
  1358. entryInvalidErr := false
  1359. for _, entry := range entries {
  1360. if valid := nodePortLocalSet.validateEntry(entry); !valid {
  1361. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
  1362. entryInvalidErr = true
  1363. break
  1364. }
  1365. nodePortLocalSet.activeEntries.Insert(entry.String())
  1366. }
  1367. if entryInvalidErr {
  1368. continue
  1369. }
  1370. }
  1371. }
  1372. // Build ipvs kernel routes for each node ip address
  1373. for _, nodeIP := range nodeIPs {
  1374. // ipvs call
  1375. serv := &utilipvs.VirtualServer{
  1376. Address: nodeIP,
  1377. Port: uint16(svcInfo.NodePort()),
  1378. Protocol: string(svcInfo.Protocol()),
  1379. Scheduler: proxier.ipvsScheduler,
  1380. }
  1381. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
  1382. serv.Flags |= utilipvs.FlagPersistent
  1383. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
  1384. }
  1385. // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
  1386. if err := proxier.syncService(svcNameString, serv, false); err == nil {
  1387. activeIPVSServices[serv.String()] = true
  1388. if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
  1389. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  1390. }
  1391. } else {
  1392. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  1393. }
  1394. }
  1395. }
  1396. }
  1397. // sync ipset entries
  1398. for _, set := range proxier.ipsetList {
  1399. set.syncIPSetEntries()
  1400. }
  1401. // Tail call iptables rules for ipset, make sure only call iptables once
  1402. // in a single loop per ip set.
  1403. proxier.writeIptablesRules()
  1404. // Sync iptables rules.
  1405. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
  1406. proxier.iptablesData.Reset()
  1407. proxier.iptablesData.Write(proxier.natChains.Bytes())
  1408. proxier.iptablesData.Write(proxier.natRules.Bytes())
  1409. proxier.iptablesData.Write(proxier.filterChains.Bytes())
  1410. proxier.iptablesData.Write(proxier.filterRules.Bytes())
  1411. klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
  1412. err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
  1413. if err != nil {
  1414. klog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
  1415. metrics.IptablesRestoreFailuresTotal.Inc()
  1416. // Revert new local ports.
  1417. utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
  1418. return
  1419. }
  1420. for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
  1421. for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
  1422. latency := metrics.SinceInSeconds(lastChangeTriggerTime)
  1423. metrics.NetworkProgrammingLatency.Observe(latency)
  1424. klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
  1425. }
  1426. }
  1427. // Close old local ports and save new ones.
  1428. for k, v := range proxier.portsMap {
  1429. if replacementPortsMap[k] == nil {
  1430. v.Close()
  1431. }
  1432. }
  1433. proxier.portsMap = replacementPortsMap
  1434. // Get legacy bind address
  1435. // currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
  1436. currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  1437. if err != nil {
  1438. klog.Errorf("Failed to get bind address, err: %v", err)
  1439. }
  1440. legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
  1441. // Clean up legacy IPVS services and unbind addresses
  1442. appliedSvcs, err := proxier.ipvs.GetVirtualServers()
  1443. if err == nil {
  1444. for _, appliedSvc := range appliedSvcs {
  1445. currentIPVSServices[appliedSvc.String()] = appliedSvc
  1446. }
  1447. } else {
  1448. klog.Errorf("Failed to get ipvs service, err: %v", err)
  1449. }
  1450. proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
  1451. if proxier.healthzServer != nil {
  1452. proxier.healthzServer.Updated()
  1453. }
  1454. metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
  1455. // Update service healthchecks. The endpoints list might include services that are
  1456. // not "OnlyLocal", but the services list will not, and the serviceHealthServer
  1457. // will just drop those endpoints.
  1458. if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
  1459. klog.Errorf("Error syncing healthcheck services: %v", err)
  1460. }
  1461. if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
  1462. klog.Errorf("Error syncing healthcheck endpoints: %v", err)
  1463. }
  1464. // Finish housekeeping.
  1465. // TODO: these could be made more consistent.
  1466. for _, svcIP := range staleServices.UnsortedList() {
  1467. if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
  1468. klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
  1469. }
  1470. }
  1471. proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
  1472. }
  1473. // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
  1474. // according to proxier.ipsetList information and the ipset match relationship that `ipsetWithIptablesChain` specified.
  1475. // some ipset(kubeClusterIPSet for example) have particular match rules and iptables jump relation should be sync separately.
  1476. func (proxier *Proxier) writeIptablesRules() {
  1477. // We are creating those slices ones here to avoid memory reallocations
  1478. // in every loop. Note that reuse the memory, instead of doing:
  1479. // slice = <some new slice>
  1480. // you should always do one of the below:
  1481. // slice = slice[:0] // and then append to it
  1482. // slice = append(slice[:0], ...)
  1483. // To avoid growing this slice, we arbitrarily set its size to 64,
  1484. // there is never more than that many arguments for a single line.
  1485. // Note that even if we go over 64, it will still be correct - it
  1486. // is just for efficiency, not correctness.
  1487. args := make([]string, 64)
  1488. for _, set := range ipsetWithIptablesChain {
  1489. if _, find := proxier.ipsetList[set.name]; find && !proxier.ipsetList[set.name].isEmpty() {
  1490. args = append(args[:0], "-A", set.from)
  1491. if set.protocolMatch != "" {
  1492. args = append(args, "-p", set.protocolMatch)
  1493. }
  1494. args = append(args,
  1495. "-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(),
  1496. "-m", "set", "--match-set", proxier.ipsetList[set.name].Name,
  1497. set.matchType,
  1498. )
  1499. writeLine(proxier.natRules, append(args, "-j", set.to)...)
  1500. }
  1501. }
  1502. if !proxier.ipsetList[kubeClusterIPSet].isEmpty() {
  1503. args = append(args[:0],
  1504. "-A", string(kubeServicesChain),
  1505. "-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(),
  1506. "-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name,
  1507. )
  1508. if proxier.masqueradeAll {
  1509. writeLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...)
  1510. } else if len(proxier.clusterCIDR) > 0 {
  1511. // This masquerades off-cluster traffic to a service VIP. The idea
  1512. // is that you can establish a static route for your Service range,
  1513. // routing to any node, and that node will bridge into the Service
  1514. // for you. Since that might bounce off-node, we masquerade here.
  1515. // If/when we support "Local" policy for VIPs, we should update this.
  1516. writeLine(proxier.natRules, append(args, "dst,dst", "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
  1517. } else {
  1518. // Masquerade all OUTPUT traffic coming from a service ip.
  1519. // The kube dummy interface has all service VIPs assigned which
  1520. // results in the service VIP being picked as the source IP to reach
  1521. // a VIP. This leads to a connection from VIP:<random port> to
  1522. // VIP:<service port>.
  1523. // Always masquerading OUTPUT (node-originating) traffic with a VIP
  1524. // source ip and service port destination fixes the outgoing connections.
  1525. writeLine(proxier.natRules, append(args, "src,dst", "-j", string(KubeMarkMasqChain))...)
  1526. }
  1527. }
  1528. if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
  1529. // Build masquerade rules for packets to external IPs.
  1530. args = append(args[:0],
  1531. "-A", string(kubeServicesChain),
  1532. "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
  1533. "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
  1534. "dst,dst",
  1535. )
  1536. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
  1537. // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
  1538. // nor from a local process to be forwarded to the service.
  1539. // This rule roughly translates to "all traffic from off-machine".
  1540. // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
  1541. externalTrafficOnlyArgs := append(args,
  1542. "-m", "physdev", "!", "--physdev-is-in",
  1543. "-m", "addrtype", "!", "--src-type", "LOCAL")
  1544. writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...)
  1545. dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
  1546. // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
  1547. // This covers cases like GCE load-balancers which get added to the local routing table.
  1548. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
  1549. }
  1550. // -A KUBE-SERVICES -m addrtype --dst-type LOCAL -j KUBE-NODE-PORT
  1551. args = append(args[:0],
  1552. "-A", string(kubeServicesChain),
  1553. "-m", "addrtype", "--dst-type", "LOCAL",
  1554. )
  1555. writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...)
  1556. // mark drop for KUBE-LOAD-BALANCER
  1557. writeLine(proxier.natRules, []string{
  1558. "-A", string(KubeLoadBalancerChain),
  1559. "-j", string(KubeMarkMasqChain),
  1560. }...)
  1561. // mark drop for KUBE-FIRE-WALL
  1562. writeLine(proxier.natRules, []string{
  1563. "-A", string(KubeFireWallChain),
  1564. "-j", string(KubeMarkDropChain),
  1565. }...)
  1566. // Accept all traffic with destination of ipvs virtual service, in case other iptables rules
  1567. // block the traffic, that may result in ipvs rules invalid.
  1568. // Those rules must be in the end of KUBE-SERVICE chain
  1569. proxier.acceptIPVSTraffic()
  1570. // If the masqueradeMark has been added then we want to forward that same
  1571. // traffic, this allows NodePort traffic to be forwarded even if the default
  1572. // FORWARD policy is not accept.
  1573. writeLine(proxier.filterRules,
  1574. "-A", string(KubeForwardChain),
  1575. "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
  1576. "-m", "mark", "--mark", proxier.masqueradeMark,
  1577. "-j", "ACCEPT",
  1578. )
  1579. // The following rules can only be set if clusterCIDR has been defined.
  1580. if len(proxier.clusterCIDR) != 0 {
  1581. // The following two rules ensure the traffic after the initial packet
  1582. // accepted by the "kubernetes forwarding rules" rule above will be
  1583. // accepted, to be as specific as possible the traffic must be sourced
  1584. // or destined to the clusterCIDR (to/from a pod).
  1585. writeLine(proxier.filterRules,
  1586. "-A", string(KubeForwardChain),
  1587. "-s", proxier.clusterCIDR,
  1588. "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
  1589. "-m", "conntrack",
  1590. "--ctstate", "RELATED,ESTABLISHED",
  1591. "-j", "ACCEPT",
  1592. )
  1593. writeLine(proxier.filterRules,
  1594. "-A", string(KubeForwardChain),
  1595. "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
  1596. "-d", proxier.clusterCIDR,
  1597. "-m", "conntrack",
  1598. "--ctstate", "RELATED,ESTABLISHED",
  1599. "-j", "ACCEPT",
  1600. )
  1601. }
  1602. // Write the end-of-table markers.
  1603. writeLine(proxier.filterRules, "COMMIT")
  1604. writeLine(proxier.natRules, "COMMIT")
  1605. }
  1606. func (proxier *Proxier) acceptIPVSTraffic() {
  1607. sets := []string{kubeClusterIPSet, kubeLoadBalancerSet}
  1608. for _, set := range sets {
  1609. var matchType string
  1610. if !proxier.ipsetList[set].isEmpty() {
  1611. switch proxier.ipsetList[set].SetType {
  1612. case utilipset.BitmapPort:
  1613. matchType = "dst"
  1614. default:
  1615. matchType = "dst,dst"
  1616. }
  1617. writeLine(proxier.natRules, []string{
  1618. "-A", string(kubeServicesChain),
  1619. "-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType,
  1620. "-j", "ACCEPT",
  1621. }...)
  1622. }
  1623. }
  1624. }
  1625. // createAndLinkeKubeChain create all kube chains that ipvs proxier need and write basic link.
  1626. func (proxier *Proxier) createAndLinkeKubeChain() {
  1627. existingFilterChains := proxier.getExistingChains(proxier.filterChainsData, utiliptables.TableFilter)
  1628. existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT)
  1629. // Make sure we keep stats for the top-level chains
  1630. for _, ch := range iptablesChains {
  1631. if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
  1632. klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err)
  1633. return
  1634. }
  1635. if ch.table == utiliptables.TableNAT {
  1636. if chain, ok := existingNATChains[ch.chain]; ok {
  1637. writeBytesLine(proxier.natChains, chain)
  1638. } else {
  1639. writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
  1640. }
  1641. } else {
  1642. if chain, ok := existingFilterChains[KubeForwardChain]; ok {
  1643. writeBytesLine(proxier.filterChains, chain)
  1644. } else {
  1645. writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain))
  1646. }
  1647. }
  1648. }
  1649. for _, jc := range iptablesJumpChain {
  1650. args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
  1651. if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
  1652. klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jc.table, jc.from, jc.to, err)
  1653. }
  1654. }
  1655. // Install the kubernetes-specific postrouting rules. We use a whole chain for
  1656. // this so that it is easier to flush and change, for example if the mark
  1657. // value should ever change.
  1658. // NB: THIS MUST MATCH the corresponding code in the kubelet
  1659. masqRule := []string{
  1660. "-A", string(kubePostroutingChain),
  1661. "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
  1662. "-m", "mark", "--mark", proxier.masqueradeMark,
  1663. "-j", "MASQUERADE",
  1664. }
  1665. if proxier.iptables.HasRandomFully() {
  1666. masqRule = append(masqRule, "--random-fully")
  1667. klog.V(3).Info("Using `--random-fully` in the MASQUERADE rule for iptables")
  1668. } else {
  1669. klog.V(2).Info("Not using `--random-fully` in the MASQUERADE rule for iptables because the local version of iptables does not support it")
  1670. }
  1671. writeLine(proxier.natRules, masqRule...)
  1672. // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
  1673. // this so that it is easier to flush and change, for example if the mark
  1674. // value should ever change.
  1675. writeLine(proxier.natRules, []string{
  1676. "-A", string(KubeMarkMasqChain),
  1677. "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
  1678. }...)
  1679. }
  1680. // getExistingChains get iptables-save output so we can check for existing chains and rules.
  1681. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
  1682. // Result may SHARE memory with contents of buffer.
  1683. func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptables.Table) map[utiliptables.Chain][]byte {
  1684. buffer.Reset()
  1685. err := proxier.iptables.SaveInto(table, buffer)
  1686. if err != nil { // if we failed to get any rules
  1687. klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
  1688. } else { // otherwise parse the output
  1689. return utiliptables.GetChainLines(table, buffer.Bytes())
  1690. }
  1691. return nil
  1692. }
  1693. // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
  1694. // risk sending more traffic to it, all of which will be lost (because UDP).
  1695. // This assumes the proxier mutex is held
  1696. func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
  1697. for _, epSvcPair := range connectionMap {
  1698. if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
  1699. endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
  1700. err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
  1701. if err != nil {
  1702. klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
  1703. }
  1704. for _, extIP := range svcInfo.ExternalIPStrings() {
  1705. err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
  1706. if err != nil {
  1707. klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err)
  1708. }
  1709. }
  1710. for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
  1711. err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
  1712. if err != nil {
  1713. klog.Errorf("Failed to delete %s endpoint connections for LoadBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err)
  1714. }
  1715. }
  1716. }
  1717. }
  1718. }
  1719. func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
  1720. appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
  1721. if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
  1722. if appliedVirtualServer == nil {
  1723. // IPVS service is not found, create a new service
  1724. klog.V(3).Infof("Adding new service %q %s:%d/%s", svcName, vs.Address, vs.Port, vs.Protocol)
  1725. if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
  1726. klog.Errorf("Failed to add IPVS service %q: %v", svcName, err)
  1727. return err
  1728. }
  1729. } else {
  1730. // IPVS service was changed, update the existing one
  1731. // During updates, service VIP will not go down
  1732. klog.V(3).Infof("IPVS service %s was changed", svcName)
  1733. if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
  1734. klog.Errorf("Failed to update IPVS service, err:%v", err)
  1735. return err
  1736. }
  1737. }
  1738. }
  1739. // bind service address to dummy interface even if service not changed,
  1740. // in case that service IP was removed by other processes
  1741. if bindAddr {
  1742. klog.V(4).Infof("Bind addr %s", vs.Address.String())
  1743. _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
  1744. if err != nil {
  1745. klog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err)
  1746. return err
  1747. }
  1748. }
  1749. return nil
  1750. }
  1751. func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
  1752. appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
  1753. if err != nil || appliedVirtualServer == nil {
  1754. klog.Errorf("Failed to get IPVS service, error: %v", err)
  1755. return err
  1756. }
  1757. // curEndpoints represents IPVS destinations listed from current system.
  1758. curEndpoints := sets.NewString()
  1759. // newEndpoints represents Endpoints watched from API Server.
  1760. newEndpoints := sets.NewString()
  1761. curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
  1762. if err != nil {
  1763. klog.Errorf("Failed to list IPVS destinations, error: %v", err)
  1764. return err
  1765. }
  1766. for _, des := range curDests {
  1767. curEndpoints.Insert(des.String())
  1768. }
  1769. endpoints := proxier.endpointsMap[svcPortName]
  1770. // Service Topology will not be enabled in the following cases:
  1771. // 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
  1772. // 2. ServiceTopology is not enabled.
  1773. // 3. EndpointSlice is not enabled (service topology depends on endpoint slice
  1774. // to get topology information).
  1775. if !onlyNodeLocalEndpoints && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
  1776. endpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, proxier.serviceMap[svcPortName].TopologyKeys(), endpoints)
  1777. }
  1778. for _, epInfo := range endpoints {
  1779. if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() {
  1780. continue
  1781. }
  1782. newEndpoints.Insert(epInfo.String())
  1783. }
  1784. // Create new endpoints
  1785. for _, ep := range newEndpoints.List() {
  1786. ip, port, err := net.SplitHostPort(ep)
  1787. if err != nil {
  1788. klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
  1789. continue
  1790. }
  1791. portNum, err := strconv.Atoi(port)
  1792. if err != nil {
  1793. klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
  1794. continue
  1795. }
  1796. newDest := &utilipvs.RealServer{
  1797. Address: net.ParseIP(ip),
  1798. Port: uint16(portNum),
  1799. Weight: 1,
  1800. }
  1801. if curEndpoints.Has(ep) {
  1802. // check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately
  1803. uniqueRS := GetUniqueRSName(vs, newDest)
  1804. if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
  1805. continue
  1806. }
  1807. klog.V(5).Infof("new ep %q is in graceful delete list", uniqueRS)
  1808. err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
  1809. if err != nil {
  1810. klog.Errorf("Failed to delete endpoint: %v in gracefulDeleteQueue, error: %v", ep, err)
  1811. continue
  1812. }
  1813. }
  1814. err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
  1815. if err != nil {
  1816. klog.Errorf("Failed to add destination: %v, error: %v", newDest, err)
  1817. continue
  1818. }
  1819. }
  1820. // Delete old endpoints
  1821. for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
  1822. // if curEndpoint is in gracefulDelete, skip
  1823. uniqueRS := vs.String() + "/" + ep
  1824. if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
  1825. continue
  1826. }
  1827. ip, port, err := net.SplitHostPort(ep)
  1828. if err != nil {
  1829. klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
  1830. continue
  1831. }
  1832. portNum, err := strconv.Atoi(port)
  1833. if err != nil {
  1834. klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
  1835. continue
  1836. }
  1837. delDest := &utilipvs.RealServer{
  1838. Address: net.ParseIP(ip),
  1839. Port: uint16(portNum),
  1840. }
  1841. klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
  1842. err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
  1843. if err != nil {
  1844. klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
  1845. continue
  1846. }
  1847. }
  1848. return nil
  1849. }
  1850. func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) {
  1851. isIPv6 := utilnet.IsIPv6(proxier.nodeIP)
  1852. for cs := range currentServices {
  1853. svc := currentServices[cs]
  1854. if proxier.isIPInExcludeCIDRs(svc.Address) {
  1855. continue
  1856. }
  1857. if utilnet.IsIPv6(svc.Address) != isIPv6 {
  1858. // Not our family
  1859. continue
  1860. }
  1861. if _, ok := activeServices[cs]; !ok {
  1862. klog.V(4).Infof("Delete service %s", svc.String())
  1863. if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
  1864. klog.Errorf("Failed to delete service %s, error: %v", svc.String(), err)
  1865. }
  1866. addr := svc.Address.String()
  1867. if _, ok := legacyBindAddrs[addr]; ok {
  1868. klog.V(4).Infof("Unbinding address %s", addr)
  1869. if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil {
  1870. klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
  1871. } else {
  1872. // In case we delete a multi-port service, avoid trying to unbind multiple times
  1873. delete(legacyBindAddrs, addr)
  1874. }
  1875. }
  1876. }
  1877. }
  1878. }
  1879. func (proxier *Proxier) isIPInExcludeCIDRs(ip net.IP) bool {
  1880. // make sure it does not fall within an excluded CIDR range.
  1881. for _, excludedCIDR := range proxier.excludeCIDRs {
  1882. if excludedCIDR.Contains(ip) {
  1883. return true
  1884. }
  1885. }
  1886. return false
  1887. }
  1888. func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) map[string]bool {
  1889. legacyAddrs := make(map[string]bool)
  1890. isIpv6 := utilnet.IsIPv6(proxier.nodeIP)
  1891. for _, addr := range currentBindAddrs {
  1892. addrIsIpv6 := utilnet.IsIPv6(net.ParseIP(addr))
  1893. if addrIsIpv6 && !isIpv6 || !addrIsIpv6 && isIpv6 {
  1894. continue
  1895. }
  1896. if _, ok := activeBindAddrs[addr]; !ok {
  1897. legacyAddrs[addr] = true
  1898. }
  1899. }
  1900. return legacyAddrs
  1901. }
  1902. // Join all words with spaces, terminate with newline and write to buff.
  1903. func writeLine(buf *bytes.Buffer, words ...string) {
  1904. // We avoid strings.Join for performance reasons.
  1905. for i := range words {
  1906. buf.WriteString(words[i])
  1907. if i < len(words)-1 {
  1908. buf.WriteByte(' ')
  1909. } else {
  1910. buf.WriteByte('\n')
  1911. }
  1912. }
  1913. }
  1914. func writeBytesLine(buf *bytes.Buffer, bytes []byte) {
  1915. buf.Write(bytes)
  1916. buf.WriteByte('\n')
  1917. }
  1918. // listenPortOpener opens ports by calling bind() and listen().
  1919. type listenPortOpener struct{}
  1920. // OpenLocalPort holds the given local port open.
  1921. func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
  1922. return openLocalPort(lp)
  1923. }
  1924. func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
  1925. // For ports on node IPs, open the actual port and hold it, even though we
  1926. // use ipvs to redirect traffic.
  1927. // This ensures a) that it's safe to use that port and b) that (a) stays
  1928. // true. The risk is that some process on the node (e.g. sshd or kubelet)
  1929. // is using a port and we give that same port out to a Service. That would
  1930. // be bad because ipvs would silently claim the traffic but the process
  1931. // would never know.
  1932. // NOTE: We should not need to have a real listen()ing socket - bind()
  1933. // should be enough, but I can't figure out a way to e2e test without
  1934. // it. Tools like 'ss' and 'netstat' do not show sockets that are
  1935. // bind()ed but not listen()ed, and at least the default debian netcat
  1936. // has no way to avoid about 10 seconds of retries.
  1937. var socket utilproxy.Closeable
  1938. switch lp.Protocol {
  1939. case "tcp":
  1940. listener, err := net.Listen("tcp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
  1941. if err != nil {
  1942. return nil, err
  1943. }
  1944. socket = listener
  1945. case "udp":
  1946. addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
  1947. if err != nil {
  1948. return nil, err
  1949. }
  1950. conn, err := net.ListenUDP("udp", addr)
  1951. if err != nil {
  1952. return nil, err
  1953. }
  1954. socket = conn
  1955. default:
  1956. return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
  1957. }
  1958. klog.V(2).Infof("Opened local port %s", lp.String())
  1959. return socket, nil
  1960. }
  1961. // ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets
  1962. // It will only operate iptables *nat table.
  1963. // Create and link the kube postrouting chain for SNAT packets.
  1964. // Chain POSTROUTING (policy ACCEPT)
  1965. // target prot opt source destination
  1966. // KUBE-POSTROUTING all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes postrouting rules *
  1967. // Maintain by kubelet network sync loop
  1968. // *nat
  1969. // :KUBE-POSTROUTING - [0:0]
  1970. // Chain KUBE-POSTROUTING (1 references)
  1971. // target prot opt source destination
  1972. // MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
  1973. // :KUBE-MARK-MASQ - [0:0]
  1974. // Chain KUBE-MARK-MASQ (0 references)
  1975. // target prot opt source destination
  1976. // MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x4000