12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package iptables
- //
- // NOTE: this needs to be tested in e2e since it uses iptables for everything.
- //
- import (
- "bytes"
- "crypto/sha256"
- "encoding/base32"
- "fmt"
- "net"
- "reflect"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "k8s.io/api/core/v1"
- discovery "k8s.io/api/discovery/v1beta1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/wait"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- "k8s.io/client-go/tools/record"
- "k8s.io/klog"
- "k8s.io/kubernetes/pkg/features"
- "k8s.io/kubernetes/pkg/proxy"
- "k8s.io/kubernetes/pkg/proxy/healthcheck"
- "k8s.io/kubernetes/pkg/proxy/metaproxier"
- "k8s.io/kubernetes/pkg/proxy/metrics"
- utilproxy "k8s.io/kubernetes/pkg/proxy/util"
- "k8s.io/kubernetes/pkg/util/async"
- "k8s.io/kubernetes/pkg/util/conntrack"
- utiliptables "k8s.io/kubernetes/pkg/util/iptables"
- utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
- utilexec "k8s.io/utils/exec"
- utilnet "k8s.io/utils/net"
- )
- const (
- // the services chain
- kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
- // the external services chain
- kubeExternalServicesChain utiliptables.Chain = "KUBE-EXTERNAL-SERVICES"
- // the nodeports chain
- kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
- // the kubernetes postrouting chain
- kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
- // KubeMarkMasqChain is the mark-for-masquerade chain
- KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
- // KubeMarkDropChain is the mark-for-drop chain
- KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
- // the kubernetes forward chain
- kubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
- )
- // KernelCompatTester tests whether the required kernel capabilities are
- // present to run the iptables proxier.
- type KernelCompatTester interface {
- IsCompatible() error
- }
- // CanUseIPTablesProxier returns true if we should use the iptables Proxier
- // instead of the "classic" userspace Proxier.
- func CanUseIPTablesProxier(kcompat KernelCompatTester) (bool, error) {
- if err := kcompat.IsCompatible(); err != nil {
- return false, err
- }
- return true, nil
- }
- var _ KernelCompatTester = LinuxKernelCompatTester{}
- // LinuxKernelCompatTester is the Linux implementation of KernelCompatTester
- type LinuxKernelCompatTester struct{}
- // IsCompatible checks for the required sysctls. We don't care about the value, just
- // that it exists. If this Proxier is chosen, we'll initialize it as we
- // need.
- func (lkct LinuxKernelCompatTester) IsCompatible() error {
- _, err := utilsysctl.New().GetSysctl(sysctlRouteLocalnet)
- return err
- }
- const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
- const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables"
- // internal struct for string service information
- type serviceInfo struct {
- *proxy.BaseServiceInfo
- // The following fields are computed and stored for performance reasons.
- serviceNameString string
- servicePortChainName utiliptables.Chain
- serviceFirewallChainName utiliptables.Chain
- serviceLBChainName utiliptables.Chain
- }
- // returns a new proxy.ServicePort which abstracts a serviceInfo
- func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
- info := &serviceInfo{BaseServiceInfo: baseInfo}
- // Store the following for performance reasons.
- svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
- svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
- protocol := strings.ToLower(string(info.Protocol()))
- info.serviceNameString = svcPortName.String()
- info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol)
- info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol)
- info.serviceLBChainName = serviceLBChainName(info.serviceNameString, protocol)
- return info
- }
- // internal struct for endpoints information
- type endpointsInfo struct {
- *proxy.BaseEndpointInfo
- // The following fields we lazily compute and store here for performance
- // reasons. If the protocol is the same as you expect it to be, then the
- // chainName can be reused, otherwise it should be recomputed.
- protocol string
- chainName utiliptables.Chain
- }
- // returns a new proxy.Endpoint which abstracts a endpointsInfo
- func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo) proxy.Endpoint {
- return &endpointsInfo{BaseEndpointInfo: baseInfo}
- }
- // Equal overrides the Equal() function implemented by proxy.BaseEndpointInfo.
- func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
- o, ok := other.(*endpointsInfo)
- if !ok {
- klog.Error("Failed to cast endpointsInfo")
- return false
- }
- return e.Endpoint == o.Endpoint &&
- e.IsLocal == o.IsLocal &&
- e.protocol == o.protocol &&
- e.chainName == o.chainName
- }
- // Returns the endpoint chain name for a given endpointsInfo.
- func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain {
- if e.protocol != protocol {
- e.protocol = protocol
- e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.Endpoint)
- }
- return e.chainName
- }
- // Proxier is an iptables based proxy for connections between a localhost:lport
- // and services that provide the actual backends.
- type Proxier struct {
- // endpointsChanges and serviceChanges contains all changes to endpoints and
- // services that happened since iptables was synced. For a single object,
- // changes are accumulated, i.e. previous is state from before all of them,
- // current is state after applying all of those.
- endpointsChanges *proxy.EndpointChangeTracker
- serviceChanges *proxy.ServiceChangeTracker
- mu sync.Mutex // protects the following fields
- serviceMap proxy.ServiceMap
- endpointsMap proxy.EndpointsMap
- portsMap map[utilproxy.LocalPort]utilproxy.Closeable
- nodeLabels map[string]string
- // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true
- // when corresponding objects are synced after startup. This is used to avoid
- // updating iptables with some partial data after kube-proxy restart.
- endpointsSynced bool
- endpointSlicesSynced bool
- servicesSynced bool
- initialized int32
- syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
- syncPeriod time.Duration
- // These are effectively const and do not need the mutex to be held.
- iptables utiliptables.Interface
- masqueradeAll bool
- masqueradeMark string
- exec utilexec.Interface
- clusterCIDR string
- hostname string
- nodeIP net.IP
- portMapper utilproxy.PortOpener
- recorder record.EventRecorder
- serviceHealthServer healthcheck.ServiceHealthServer
- healthzServer healthcheck.ProxierHealthUpdater
- // Since converting probabilities (floats) to strings is expensive
- // and we are using only probabilities in the format of 1/n, we are
- // precomputing some number of those and cache for future reuse.
- precomputedProbabilities []string
- // The following buffers are used to reuse memory and avoid allocations
- // that are significantly impacting performance.
- iptablesData *bytes.Buffer
- existingFilterChainsData *bytes.Buffer
- filterChains *bytes.Buffer
- filterRules *bytes.Buffer
- natChains *bytes.Buffer
- natRules *bytes.Buffer
- // endpointChainsNumber is the total amount of endpointChains across all
- // services that we will generate (it is computed at the beginning of
- // syncProxyRules method). If that is large enough, comments in some
- // iptable rules are dropped to improve performance.
- endpointChainsNumber int
- // Values are as a parameter to select the interfaces where nodeport works.
- nodePortAddresses []string
- // networkInterfacer defines an interface for several net library functions.
- // Inject for test purpose.
- networkInterfacer utilproxy.NetworkInterfacer
- }
- // listenPortOpener opens ports by calling bind() and listen().
- type listenPortOpener struct{}
- // OpenLocalPort holds the given local port open.
- func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
- return openLocalPort(lp)
- }
- // Proxier implements proxy.Provider
- var _ proxy.Provider = &Proxier{}
- // NewProxier returns a new Proxier given an iptables Interface instance.
- // Because of the iptables logic, it is assumed that there is only a single Proxier active on a machine.
- // An error will be returned if iptables fails to update or acquire the initial lock.
- // Once a proxier is created, it will keep iptables up to date in the background and
- // will not terminate if a particular iptables call fails.
- func NewProxier(ipt utiliptables.Interface,
- sysctl utilsysctl.Interface,
- exec utilexec.Interface,
- syncPeriod time.Duration,
- minSyncPeriod time.Duration,
- masqueradeAll bool,
- masqueradeBit int,
- clusterCIDR string,
- hostname string,
- nodeIP net.IP,
- recorder record.EventRecorder,
- healthzServer healthcheck.ProxierHealthUpdater,
- nodePortAddresses []string,
- ) (*Proxier, error) {
- // Set the route_localnet sysctl we need for
- if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
- if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
- return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
- }
- }
- // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
- // are connected to a Linux bridge (but not SDN bridges). Until most
- // plugins handle this, log when config is missing
- if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
- klog.Warning("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
- }
- // Generate the masquerade mark to use for SNAT rules.
- masqueradeValue := 1 << uint(masqueradeBit)
- masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
- if len(clusterCIDR) == 0 {
- klog.Warning("clusterCIDR not specified, unable to distinguish between internal and external traffic")
- } else if utilnet.IsIPv6CIDRString(clusterCIDR) != ipt.IsIpv6() {
- return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, ipt.IsIpv6())
- }
- endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying)
- serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
- isIPv6 := ipt.IsIpv6()
- proxier := &Proxier{
- portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
- serviceMap: make(proxy.ServiceMap),
- serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
- endpointsMap: make(proxy.EndpointsMap),
- endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled),
- syncPeriod: syncPeriod,
- iptables: ipt,
- masqueradeAll: masqueradeAll,
- masqueradeMark: masqueradeMark,
- exec: exec,
- clusterCIDR: clusterCIDR,
- hostname: hostname,
- nodeIP: nodeIP,
- portMapper: &listenPortOpener{},
- recorder: recorder,
- serviceHealthServer: serviceHealthServer,
- healthzServer: healthzServer,
- precomputedProbabilities: make([]string, 0, 1001),
- iptablesData: bytes.NewBuffer(nil),
- existingFilterChainsData: bytes.NewBuffer(nil),
- filterChains: bytes.NewBuffer(nil),
- filterRules: bytes.NewBuffer(nil),
- natChains: bytes.NewBuffer(nil),
- natRules: bytes.NewBuffer(nil),
- nodePortAddresses: nodePortAddresses,
- networkInterfacer: utilproxy.RealNetwork{},
- }
- burstSyncs := 2
- klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
- // We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
- // We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
- // time.Hour is arbitrary.
- proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
- go ipt.Monitor(utiliptables.Chain("KUBE-PROXY-CANARY"),
- []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
- proxier.syncProxyRules, syncPeriod, wait.NeverStop)
- return proxier, nil
- }
- // NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies.
- func NewDualStackProxier(
- ipt [2]utiliptables.Interface,
- sysctl utilsysctl.Interface,
- exec utilexec.Interface,
- syncPeriod time.Duration,
- minSyncPeriod time.Duration,
- masqueradeAll bool,
- masqueradeBit int,
- clusterCIDR [2]string,
- hostname string,
- nodeIP [2]net.IP,
- recorder record.EventRecorder,
- healthzServer healthcheck.ProxierHealthUpdater,
- nodePortAddresses []string,
- ) (proxy.Provider, error) {
- // Create an ipv4 instance of the single-stack proxier
- ipv4Proxier, err := NewProxier(ipt[0], sysctl,
- exec, syncPeriod, minSyncPeriod,
- masqueradeAll, masqueradeBit, clusterCIDR[0], hostname, nodeIP[0],
- recorder, healthzServer, nodePortAddresses)
- if err != nil {
- return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
- }
- ipv6Proxier, err := NewProxier(ipt[1], sysctl,
- exec, syncPeriod, minSyncPeriod,
- masqueradeAll, masqueradeBit, clusterCIDR[1], hostname, nodeIP[1],
- recorder, healthzServer, nodePortAddresses)
- if err != nil {
- return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
- }
- return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil // TODO move meta-proxier to mode-neutral package
- }
- type iptablesJumpChain struct {
- table utiliptables.Table
- dstChain utiliptables.Chain
- srcChain utiliptables.Chain
- comment string
- extraArgs []string
- }
- var iptablesJumpChains = []iptablesJumpChain{
- {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
- {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
- {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
- {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainInput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
- {utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil},
- {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil},
- {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil},
- {utiliptables.TableNAT, kubePostroutingChain, utiliptables.ChainPostrouting, "kubernetes postrouting rules", nil},
- }
- var iptablesCleanupOnlyChains = []iptablesJumpChain{}
- // CleanupLeftovers removes all iptables rules and chains created by the Proxier
- // It returns true if an error was encountered. Errors are logged.
- func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
- // Unlink our chains
- for _, jump := range append(iptablesJumpChains, iptablesCleanupOnlyChains...) {
- args := append(jump.extraArgs,
- "-m", "comment", "--comment", jump.comment,
- "-j", string(jump.dstChain),
- )
- if err := ipt.DeleteRule(jump.table, jump.srcChain, args...); err != nil {
- if !utiliptables.IsNotFoundError(err) {
- klog.Errorf("Error removing pure-iptables proxy rule: %v", err)
- encounteredError = true
- }
- }
- }
- // Flush and remove all of our "-t nat" chains.
- iptablesData := bytes.NewBuffer(nil)
- if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil {
- klog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err)
- encounteredError = true
- } else {
- existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes())
- natChains := bytes.NewBuffer(nil)
- natRules := bytes.NewBuffer(nil)
- writeLine(natChains, "*nat")
- // Start with chains we know we need to remove.
- for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} {
- if _, found := existingNATChains[chain]; found {
- chainString := string(chain)
- writeBytesLine(natChains, existingNATChains[chain]) // flush
- writeLine(natRules, "-X", chainString) // delete
- }
- }
- // Hunt for service and endpoint chains.
- for chain := range existingNATChains {
- chainString := string(chain)
- if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") {
- writeBytesLine(natChains, existingNATChains[chain]) // flush
- writeLine(natRules, "-X", chainString) // delete
- }
- }
- writeLine(natRules, "COMMIT")
- natLines := append(natChains.Bytes(), natRules.Bytes()...)
- // Write it.
- err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
- if err != nil {
- klog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableNAT, err)
- metrics.IptablesRestoreFailuresTotal.Inc()
- encounteredError = true
- }
- }
- // Flush and remove all of our "-t filter" chains.
- iptablesData.Reset()
- if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil {
- klog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableFilter, err)
- encounteredError = true
- } else {
- existingFilterChains := utiliptables.GetChainLines(utiliptables.TableFilter, iptablesData.Bytes())
- filterChains := bytes.NewBuffer(nil)
- filterRules := bytes.NewBuffer(nil)
- writeLine(filterChains, "*filter")
- for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
- if _, found := existingFilterChains[chain]; found {
- chainString := string(chain)
- writeBytesLine(filterChains, existingFilterChains[chain])
- writeLine(filterRules, "-X", chainString)
- }
- }
- writeLine(filterRules, "COMMIT")
- filterLines := append(filterChains.Bytes(), filterRules.Bytes()...)
- // Write it.
- if err := ipt.Restore(utiliptables.TableFilter, filterLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil {
- klog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableFilter, err)
- metrics.IptablesRestoreFailuresTotal.Inc()
- encounteredError = true
- }
- }
- return encounteredError
- }
- func computeProbability(n int) string {
- return fmt.Sprintf("%0.10f", 1.0/float64(n))
- }
- // This assumes proxier.mu is held
- func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) {
- if len(proxier.precomputedProbabilities) == 0 {
- proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "<bad value>")
- }
- for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ {
- proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i))
- }
- }
- // This assumes proxier.mu is held
- func (proxier *Proxier) probability(n int) string {
- if n >= len(proxier.precomputedProbabilities) {
- proxier.precomputeProbabilities(n)
- }
- return proxier.precomputedProbabilities[n]
- }
- // Sync is called to synchronize the proxier state to iptables as soon as possible.
- func (proxier *Proxier) Sync() {
- if proxier.healthzServer != nil {
- proxier.healthzServer.QueuedUpdate()
- }
- proxier.syncRunner.Run()
- }
- // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
- func (proxier *Proxier) SyncLoop() {
- // Update healthz timestamp at beginning in case Sync() never succeeds.
- if proxier.healthzServer != nil {
- proxier.healthzServer.Updated()
- }
- proxier.syncRunner.Loop(wait.NeverStop)
- }
- func (proxier *Proxier) setInitialized(value bool) {
- var initialized int32
- if value {
- initialized = 1
- }
- atomic.StoreInt32(&proxier.initialized, initialized)
- }
- func (proxier *Proxier) isInitialized() bool {
- return atomic.LoadInt32(&proxier.initialized) > 0
- }
- // OnServiceAdd is called whenever creation of new service object
- // is observed.
- func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
- proxier.OnServiceUpdate(nil, service)
- }
- // OnServiceUpdate is called whenever modification of an existing
- // service object is observed.
- func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
- if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
- proxier.Sync()
- }
- }
- // OnServiceDelete is called whenever deletion of an existing service
- // object is observed.
- func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
- proxier.OnServiceUpdate(service, nil)
- }
- // OnServiceSynced is called once all the initial even handlers were
- // called and the state is fully propagated to local cache.
- func (proxier *Proxier) OnServiceSynced() {
- proxier.mu.Lock()
- proxier.servicesSynced = true
- if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
- proxier.setInitialized(proxier.endpointSlicesSynced)
- } else {
- proxier.setInitialized(proxier.endpointsSynced)
- }
- proxier.mu.Unlock()
- // Sync unconditionally - this is called once per lifetime.
- proxier.syncProxyRules()
- }
- // OnEndpointsAdd is called whenever creation of new endpoints object
- // is observed.
- func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
- proxier.OnEndpointsUpdate(nil, endpoints)
- }
- // OnEndpointsUpdate is called whenever modification of an existing
- // endpoints object is observed.
- func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
- if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
- proxier.Sync()
- }
- }
- // OnEndpointsDelete is called whenever deletion of an existing endpoints
- // object is observed.
- func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
- proxier.OnEndpointsUpdate(endpoints, nil)
- }
- // OnEndpointsSynced is called once all the initial event handlers were
- // called and the state is fully propagated to local cache.
- func (proxier *Proxier) OnEndpointsSynced() {
- proxier.mu.Lock()
- proxier.endpointsSynced = true
- proxier.setInitialized(proxier.servicesSynced)
- proxier.mu.Unlock()
- // Sync unconditionally - this is called once per lifetime.
- proxier.syncProxyRules()
- }
- // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
- // is observed.
- func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
- if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
- proxier.Sync()
- }
- }
- // OnEndpointSliceUpdate is called whenever modification of an existing endpoint
- // slice object is observed.
- func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
- if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
- proxier.Sync()
- }
- }
- // OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
- // object is observed.
- func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
- if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
- proxier.Sync()
- }
- }
- // OnEndpointSlicesSynced is called once all the initial event handlers were
- // called and the state is fully propagated to local cache.
- func (proxier *Proxier) OnEndpointSlicesSynced() {
- proxier.mu.Lock()
- proxier.endpointSlicesSynced = true
- proxier.setInitialized(proxier.servicesSynced)
- proxier.mu.Unlock()
- // Sync unconditionally - this is called once per lifetime.
- proxier.syncProxyRules()
- }
- // OnNodeAdd is called whenever creation of new node object
- // is observed.
- func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
- if node.Name != proxier.hostname {
- klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
- return
- }
- if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
- return
- }
- proxier.mu.Lock()
- proxier.nodeLabels = node.Labels
- proxier.mu.Unlock()
- proxier.syncProxyRules()
- }
- // OnNodeUpdate is called whenever modification of an existing
- // node object is observed.
- func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
- if node.Name != proxier.hostname {
- klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
- return
- }
- if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
- return
- }
- proxier.mu.Lock()
- proxier.nodeLabels = node.Labels
- proxier.mu.Unlock()
- proxier.syncProxyRules()
- }
- // OnNodeDelete is called whever deletion of an existing node
- // object is observed.
- func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
- if node.Name != proxier.hostname {
- klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
- return
- }
- proxier.mu.Lock()
- proxier.nodeLabels = nil
- proxier.mu.Unlock()
- proxier.syncProxyRules()
- }
- // OnNodeSynced is called once all the initial event handlers were
- // called and the state is fully propagated to local cache.
- func (proxier *Proxier) OnNodeSynced() {
- }
- // portProtoHash takes the ServicePortName and protocol for a service
- // returns the associated 16 character hash. This is computed by hashing (sha256)
- // then encoding to base32 and truncating to 16 chars. We do this because IPTables
- // Chain Names must be <= 28 chars long, and the longer they are the harder they are to read.
- func portProtoHash(servicePortName string, protocol string) string {
- hash := sha256.Sum256([]byte(servicePortName + protocol))
- encoded := base32.StdEncoding.EncodeToString(hash[:])
- return encoded[:16]
- }
- // servicePortChainName takes the ServicePortName for a service and
- // returns the associated iptables chain. This is computed by hashing (sha256)
- // then encoding to base32 and truncating with the prefix "KUBE-SVC-".
- func servicePortChainName(servicePortName string, protocol string) utiliptables.Chain {
- return utiliptables.Chain("KUBE-SVC-" + portProtoHash(servicePortName, protocol))
- }
- // serviceFirewallChainName takes the ServicePortName for a service and
- // returns the associated iptables chain. This is computed by hashing (sha256)
- // then encoding to base32 and truncating with the prefix "KUBE-FW-".
- func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain {
- return utiliptables.Chain("KUBE-FW-" + portProtoHash(servicePortName, protocol))
- }
- // serviceLBPortChainName takes the ServicePortName for a service and
- // returns the associated iptables chain. This is computed by hashing (sha256)
- // then encoding to base32 and truncating with the prefix "KUBE-XLB-". We do
- // this because IPTables Chain Names must be <= 28 chars long, and the longer
- // they are the harder they are to read.
- func serviceLBChainName(servicePortName string, protocol string) utiliptables.Chain {
- return utiliptables.Chain("KUBE-XLB-" + portProtoHash(servicePortName, protocol))
- }
- // This is the same as servicePortChainName but with the endpoint included.
- func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) utiliptables.Chain {
- hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint))
- encoded := base32.StdEncoding.EncodeToString(hash[:])
- return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
- }
- // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
- // risk sending more traffic to it, all of which will be lost (because UDP).
- // This assumes the proxier mutex is held
- // TODO: move it to util
- func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
- for _, epSvcPair := range connectionMap {
- if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
- endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
- nodePort := svcInfo.NodePort()
- var err error
- if nodePort != 0 {
- err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
- } else {
- err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
- }
- if err != nil {
- klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
- }
- for _, extIP := range svcInfo.ExternalIPStrings() {
- err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
- if err != nil {
- klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err)
- }
- }
- for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
- err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
- if err != nil {
- klog.Errorf("Failed to delete %s endpoint connections for LoabBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err)
- }
- }
- }
- }
- }
- const endpointChainsNumberThreshold = 1000
- // Assumes proxier.mu is held.
- func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string) []string {
- // Not printing these comments, can reduce size of iptables (in case of large
- // number of endpoints) even by 40%+. So if total number of endpoint chains
- // is large enough, we simply drop those comments.
- if proxier.endpointChainsNumber > endpointChainsNumberThreshold {
- return args
- }
- return append(args, "-m", "comment", "--comment", svcName)
- }
- // This is where all of the iptables-save/restore calls happen.
- // The only other iptables rules are those that are setup in iptablesInit()
- // This assumes proxier.mu is NOT held
- func (proxier *Proxier) syncProxyRules() {
- proxier.mu.Lock()
- defer proxier.mu.Unlock()
- // don't sync rules till we've received services and endpoints
- if !proxier.isInitialized() {
- klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
- return
- }
- // Keep track of how long syncs take.
- start := time.Now()
- defer func() {
- metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
- klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
- }()
- // We assume that if this was called, we really want to sync them,
- // even if nothing changed in the meantime. In other words, callers are
- // responsible for detecting no-op changes and not calling this function.
- serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
- endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
- staleServices := serviceUpdateResult.UDPStaleClusterIP
- // merge stale services gathered from updateEndpointsMap
- for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
- if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
- klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String())
- staleServices.Insert(svcInfo.ClusterIP().String())
- for _, extIP := range svcInfo.ExternalIPStrings() {
- staleServices.Insert(extIP)
- }
- }
- }
- klog.V(3).Info("Syncing iptables rules")
- success := false
- defer func() {
- if !success {
- klog.Infof("Sync failed; retrying in %s", proxier.syncPeriod)
- proxier.syncRunner.RetryAfter(proxier.syncPeriod)
- }
- }()
- // Create and link the kube chains.
- for _, jump := range iptablesJumpChains {
- if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
- klog.Errorf("Failed to ensure that %s chain %s exists: %v", jump.table, jump.dstChain, err)
- return
- }
- args := append(jump.extraArgs,
- "-m", "comment", "--comment", jump.comment,
- "-j", string(jump.dstChain),
- )
- if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
- klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jump.table, jump.srcChain, jump.dstChain, err)
- return
- }
- }
- //
- // Below this point we will not return until we try to write the iptables rules.
- //
- // Get iptables-save output so we can check for existing chains and rules.
- // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
- existingFilterChains := make(map[utiliptables.Chain][]byte)
- proxier.existingFilterChainsData.Reset()
- err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData)
- if err != nil { // if we failed to get any rules
- klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
- } else { // otherwise parse the output
- existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes())
- }
- // IMPORTANT: existingNATChains may share memory with proxier.iptablesData.
- existingNATChains := make(map[utiliptables.Chain][]byte)
- proxier.iptablesData.Reset()
- err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
- if err != nil { // if we failed to get any rules
- klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
- } else { // otherwise parse the output
- existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
- }
- // Reset all buffers used later.
- // This is to avoid memory reallocations and thus improve performance.
- proxier.filterChains.Reset()
- proxier.filterRules.Reset()
- proxier.natChains.Reset()
- proxier.natRules.Reset()
- // Write table headers.
- writeLine(proxier.filterChains, "*filter")
- writeLine(proxier.natChains, "*nat")
- // Make sure we keep stats for the top-level chains, if they existed
- // (which most should have because we created them above).
- for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
- if chain, ok := existingFilterChains[chainName]; ok {
- writeBytesLine(proxier.filterChains, chain)
- } else {
- writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
- }
- }
- for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
- if chain, ok := existingNATChains[chainName]; ok {
- writeBytesLine(proxier.natChains, chain)
- } else {
- writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
- }
- }
- // Install the kubernetes-specific postrouting rules. We use a whole chain for
- // this so that it is easier to flush and change, for example if the mark
- // value should ever change.
- // NB: THIS MUST MATCH the corresponding code in the kubelet
- masqRule := []string{
- "-A", string(kubePostroutingChain),
- "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
- "-m", "mark", "--mark", proxier.masqueradeMark,
- "-j", "MASQUERADE",
- }
- if proxier.iptables.HasRandomFully() {
- masqRule = append(masqRule, "--random-fully")
- klog.V(3).Info("Using `--random-fully` in the MASQUERADE rule for iptables")
- } else {
- klog.V(3).Info("Not using `--random-fully` in the MASQUERADE rule for iptables because the local version of iptables does not support it")
- }
- writeLine(proxier.natRules, masqRule...)
- // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
- // this so that it is easier to flush and change, for example if the mark
- // value should ever change.
- writeLine(proxier.natRules, []string{
- "-A", string(KubeMarkMasqChain),
- "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
- }...)
- // Accumulate NAT chains to keep.
- activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
- // Accumulate the set of local ports that we will be holding open once this update is complete
- replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
- // We are creating those slices ones here to avoid memory reallocations
- // in every loop. Note that reuse the memory, instead of doing:
- // slice = <some new slice>
- // you should always do one of the below:
- // slice = slice[:0] // and then append to it
- // slice = append(slice[:0], ...)
- endpoints := make([]*endpointsInfo, 0)
- endpointChains := make([]utiliptables.Chain, 0)
- // To avoid growing this slice, we arbitrarily set its size to 64,
- // there is never more than that many arguments for a single line.
- // Note that even if we go over 64, it will still be correct - it
- // is just for efficiency, not correctness.
- args := make([]string, 64)
- // Compute total number of endpoint chains across all services.
- proxier.endpointChainsNumber = 0
- for svcName := range proxier.serviceMap {
- proxier.endpointChainsNumber += len(proxier.endpointsMap[svcName])
- }
- // Build rules for each service.
- for svcName, svc := range proxier.serviceMap {
- svcInfo, ok := svc.(*serviceInfo)
- if !ok {
- klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
- continue
- }
- isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
- protocol := strings.ToLower(string(svcInfo.Protocol()))
- svcNameString := svcInfo.serviceNameString
- allEndpoints := proxier.endpointsMap[svcName]
- hasEndpoints := len(allEndpoints) > 0
- // Service Topology will not be enabled in the following cases:
- // 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
- // 2. ServiceTopology is not enabled.
- // 3. EndpointSlice is not enabled (service topology depends on endpoint slice
- // to get topology information).
- if !svcInfo.OnlyNodeLocalEndpoints() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
- allEndpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, svcInfo.TopologyKeys(), allEndpoints)
- hasEndpoints = len(allEndpoints) > 0
- }
- svcChain := svcInfo.servicePortChainName
- if hasEndpoints {
- // Create the per-service chain, retaining counters if possible.
- if chain, ok := existingNATChains[svcChain]; ok {
- writeBytesLine(proxier.natChains, chain)
- } else {
- writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
- }
- activeNATChains[svcChain] = true
- }
- svcXlbChain := svcInfo.serviceLBChainName
- if svcInfo.OnlyNodeLocalEndpoints() {
- // Only for services request OnlyLocal traffic
- // create the per-service LB chain, retaining counters if possible.
- if lbChain, ok := existingNATChains[svcXlbChain]; ok {
- writeBytesLine(proxier.natChains, lbChain)
- } else {
- writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain))
- }
- activeNATChains[svcXlbChain] = true
- }
- // Capture the clusterIP.
- if hasEndpoints {
- args = append(args[:0],
- "-A", string(kubeServicesChain),
- "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
- "-m", protocol, "-p", protocol,
- "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
- "--dport", strconv.Itoa(svcInfo.Port()),
- )
- if proxier.masqueradeAll {
- writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
- } else if len(proxier.clusterCIDR) > 0 {
- // This masquerades off-cluster traffic to a service VIP. The idea
- // is that you can establish a static route for your Service range,
- // routing to any node, and that node will bridge into the Service
- // for you. Since that might bounce off-node, we masquerade here.
- // If/when we support "Local" policy for VIPs, we should update this.
- writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
- }
- writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
- } else {
- // No endpoints.
- writeLine(proxier.filterRules,
- "-A", string(kubeServicesChain),
- "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
- "-m", protocol, "-p", protocol,
- "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
- "--dport", strconv.Itoa(svcInfo.Port()),
- "-j", "REJECT",
- )
- }
- // Capture externalIPs.
- for _, externalIP := range svcInfo.ExternalIPStrings() {
- // If the "external" IP happens to be an IP that is local to this
- // machine, hold the local port open so no other process can open it
- // (because the socket might open but it would never work).
- if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
- klog.Errorf("can't determine if IP is local, assuming not: %v", err)
- } else if local && (svcInfo.Protocol() != v1.ProtocolSCTP) {
- lp := utilproxy.LocalPort{
- Description: "externalIP for " + svcNameString,
- IP: externalIP,
- Port: svcInfo.Port(),
- Protocol: protocol,
- }
- if proxier.portsMap[lp] != nil {
- klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
- replacementPortsMap[lp] = proxier.portsMap[lp]
- } else {
- socket, err := proxier.portMapper.OpenLocalPort(&lp)
- if err != nil {
- msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
- proxier.recorder.Eventf(
- &v1.ObjectReference{
- Kind: "Node",
- Name: proxier.hostname,
- UID: types.UID(proxier.hostname),
- Namespace: "",
- }, v1.EventTypeWarning, err.Error(), msg)
- klog.Error(msg)
- continue
- }
- replacementPortsMap[lp] = socket
- }
- }
- if hasEndpoints {
- args = append(args[:0],
- "-A", string(kubeServicesChain),
- "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
- "-m", protocol, "-p", protocol,
- "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
- "--dport", strconv.Itoa(svcInfo.Port()),
- )
- // We have to SNAT packets to external IPs.
- writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
- // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
- // nor from a local process to be forwarded to the service.
- // This rule roughly translates to "all traffic from off-machine".
- // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
- externalTrafficOnlyArgs := append(args,
- "-m", "physdev", "!", "--physdev-is-in",
- "-m", "addrtype", "!", "--src-type", "LOCAL")
- writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
- dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
- // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
- // This covers cases like GCE load-balancers which get added to the local routing table.
- writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
- } else {
- // No endpoints.
- writeLine(proxier.filterRules,
- "-A", string(kubeExternalServicesChain),
- "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
- "-m", protocol, "-p", protocol,
- "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
- "--dport", strconv.Itoa(svcInfo.Port()),
- "-j", "REJECT",
- )
- }
- }
- // Capture load-balancer ingress.
- fwChain := svcInfo.serviceFirewallChainName
- for _, ingress := range svcInfo.LoadBalancerIPStrings() {
- if ingress != "" {
- if hasEndpoints {
- // create service firewall chain
- if chain, ok := existingNATChains[fwChain]; ok {
- writeBytesLine(proxier.natChains, chain)
- } else {
- writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain))
- }
- activeNATChains[fwChain] = true
- // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
- // This currently works for loadbalancers that preserves source ips.
- // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
- args = append(args[:0],
- "-A", string(kubeServicesChain),
- "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
- "-m", protocol, "-p", protocol,
- "-d", utilproxy.ToCIDR(net.ParseIP(ingress)),
- "--dport", strconv.Itoa(svcInfo.Port()),
- )
- // jump to service firewall chain
- writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)
- args = append(args[:0],
- "-A", string(fwChain),
- "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
- )
- // Each source match rule in the FW chain may jump to either the SVC or the XLB chain
- chosenChain := svcXlbChain
- // If we are proxying globally, we need to masquerade in case we cross nodes.
- // If we are proxying only locally, we can retain the source IP.
- if !svcInfo.OnlyNodeLocalEndpoints() {
- writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
- chosenChain = svcChain
- }
- if len(svcInfo.LoadBalancerSourceRanges()) == 0 {
- // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
- writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...)
- } else {
- // firewall filter based on each source range
- allowFromNode := false
- for _, src := range svcInfo.LoadBalancerSourceRanges() {
- writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
- // ignore error because it has been validated
- _, cidr, _ := net.ParseCIDR(src)
- if cidr.Contains(proxier.nodeIP) {
- allowFromNode = true
- }
- }
- // generally, ip route rule was added to intercept request to loadbalancer vip from the
- // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
- // Need to add the following rule to allow request on host.
- if allowFromNode {
- writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress)), "-j", string(chosenChain))...)
- }
- }
- // If the packet was able to reach the end of firewall chain, then it did not get DNATed.
- // It means the packet cannot go thru the firewall, then mark it for DROP
- writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
- } else {
- // No endpoints.
- writeLine(proxier.filterRules,
- "-A", string(kubeServicesChain),
- "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
- "-m", protocol, "-p", protocol,
- "-d", utilproxy.ToCIDR(net.ParseIP(ingress)),
- "--dport", strconv.Itoa(svcInfo.Port()),
- "-j", "REJECT",
- )
- }
- }
- }
- // Capture nodeports. If we had more than 2 rules it might be
- // worthwhile to make a new per-service chain for nodeport rules, but
- // with just 2 rules it ends up being a waste and a cognitive burden.
- if svcInfo.NodePort() != 0 {
- // Hold the local port open so no other process can open it
- // (because the socket might open but it would never work).
- addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
- if err != nil {
- klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
- continue
- }
- lps := make([]utilproxy.LocalPort, 0)
- for address := range addresses {
- lp := utilproxy.LocalPort{
- Description: "nodePort for " + svcNameString,
- IP: address,
- Port: svcInfo.NodePort(),
- Protocol: protocol,
- }
- if utilproxy.IsZeroCIDR(address) {
- // Empty IP address means all
- lp.IP = ""
- lps = append(lps, lp)
- // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
- break
- }
- lps = append(lps, lp)
- }
- // For ports on node IPs, open the actual port and hold it.
- for _, lp := range lps {
- if proxier.portsMap[lp] != nil {
- klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
- replacementPortsMap[lp] = proxier.portsMap[lp]
- } else if svcInfo.Protocol() != v1.ProtocolSCTP {
- socket, err := proxier.portMapper.OpenLocalPort(&lp)
- if err != nil {
- klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
- continue
- }
- if lp.Protocol == "udp" {
- // TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
- // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
- // This only affects UDP connections, which are not common.
- // See issue: https://github.com/kubernetes/kubernetes/issues/49881
- err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
- if err != nil {
- klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
- }
- }
- replacementPortsMap[lp] = socket
- }
- }
- if hasEndpoints {
- args = append(args[:0],
- "-A", string(kubeNodePortsChain),
- "-m", "comment", "--comment", svcNameString,
- "-m", protocol, "-p", protocol,
- "--dport", strconv.Itoa(svcInfo.NodePort()),
- )
- if !svcInfo.OnlyNodeLocalEndpoints() {
- // Nodeports need SNAT, unless they're local.
- writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
- // Jump to the service chain.
- writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
- } else {
- // TODO: Make all nodePorts jump to the firewall chain.
- // Currently we only create it for loadbalancers (#33586).
- // Fix localhost martian source error
- loopback := "127.0.0.0/8"
- if isIPv6 {
- loopback = "::1/128"
- }
- writeLine(proxier.natRules, append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...)
- writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...)
- }
- } else {
- // No endpoints.
- writeLine(proxier.filterRules,
- "-A", string(kubeExternalServicesChain),
- "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
- "-m", "addrtype", "--dst-type", "LOCAL",
- "-m", protocol, "-p", protocol,
- "--dport", strconv.Itoa(svcInfo.NodePort()),
- "-j", "REJECT",
- )
- }
- }
- if !hasEndpoints {
- continue
- }
- // Generate the per-endpoint chains. We do this in multiple passes so we
- // can group rules together.
- // These two slices parallel each other - keep in sync
- endpoints = endpoints[:0]
- endpointChains = endpointChains[:0]
- var endpointChain utiliptables.Chain
- for _, ep := range allEndpoints {
- epInfo, ok := ep.(*endpointsInfo)
- if !ok {
- klog.Errorf("Failed to cast endpointsInfo %q", ep.String())
- continue
- }
- endpoints = append(endpoints, epInfo)
- endpointChain = epInfo.endpointChain(svcNameString, protocol)
- endpointChains = append(endpointChains, endpointChain)
- // Create the endpoint chain, retaining counters if possible.
- if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
- writeBytesLine(proxier.natChains, chain)
- } else {
- writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
- }
- activeNATChains[endpointChain] = true
- }
- // First write session affinity rules, if applicable.
- if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
- for _, endpointChain := range endpointChains {
- args = append(args[:0],
- "-A", string(svcChain),
- )
- args = proxier.appendServiceCommentLocked(args, svcNameString)
- args = append(args,
- "-m", "recent", "--name", string(endpointChain),
- "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
- "-j", string(endpointChain),
- )
- writeLine(proxier.natRules, args...)
- }
- }
- // Now write loadbalancing & DNAT rules.
- n := len(endpointChains)
- localEndpointChains := make([]utiliptables.Chain, 0)
- for i, endpointChain := range endpointChains {
- // Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.
- if svcInfo.OnlyNodeLocalEndpoints() && endpoints[i].IsLocal {
- localEndpointChains = append(localEndpointChains, endpointChains[i])
- }
- epIP := endpoints[i].IP()
- if epIP == "" {
- // Error parsing this endpoint has been logged. Skip to next endpoint.
- continue
- }
- // Balancing rules in the per-service chain.
- args = append(args[:0], "-A", string(svcChain))
- args = proxier.appendServiceCommentLocked(args, svcNameString)
- if i < (n - 1) {
- // Each rule is a probabilistic match.
- args = append(args,
- "-m", "statistic",
- "--mode", "random",
- "--probability", proxier.probability(n-i))
- }
- // The final (or only if n == 1) rule is a guaranteed match.
- args = append(args, "-j", string(endpointChain))
- writeLine(proxier.natRules, args...)
- // Rules in the per-endpoint chain.
- args = append(args[:0], "-A", string(endpointChain))
- args = proxier.appendServiceCommentLocked(args, svcNameString)
- // Handle traffic that loops back to the originator with SNAT.
- writeLine(proxier.natRules, append(args,
- "-s", utilproxy.ToCIDR(net.ParseIP(epIP)),
- "-j", string(KubeMarkMasqChain))...)
- // Update client-affinity lists.
- if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
- args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
- }
- // DNAT to final destination.
- args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
- writeLine(proxier.natRules, args...)
- }
- // The logic below this applies only if this service is marked as OnlyLocal
- if !svcInfo.OnlyNodeLocalEndpoints() {
- continue
- }
- // First rule in the chain redirects all pod -> external VIP traffic to the
- // Service's ClusterIP instead. This happens whether or not we have local
- // endpoints; only if clusterCIDR is specified
- if len(proxier.clusterCIDR) > 0 {
- args = append(args[:0],
- "-A", string(svcXlbChain),
- "-m", "comment", "--comment",
- `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
- "-s", proxier.clusterCIDR,
- "-j", string(svcChain),
- )
- writeLine(proxier.natRules, args...)
- }
- // Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local
- // This allows traffic originating from the host to be redirected to the service correctly,
- // otherwise traffic to LB IPs are dropped if there are no local endpoints.
- args = append(args[:0], "-A", string(svcXlbChain))
- writeLine(proxier.natRules, append(args,
- "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString),
- "-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain))...)
- writeLine(proxier.natRules, append(args,
- "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
- "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...)
- numLocalEndpoints := len(localEndpointChains)
- if numLocalEndpoints == 0 {
- // Blackhole all traffic since there are no local endpoints
- args = append(args[:0],
- "-A", string(svcXlbChain),
- "-m", "comment", "--comment",
- fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
- "-j",
- string(KubeMarkDropChain),
- )
- writeLine(proxier.natRules, args...)
- } else {
- // First write session affinity rules only over local endpoints, if applicable.
- if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
- for _, endpointChain := range localEndpointChains {
- writeLine(proxier.natRules,
- "-A", string(svcXlbChain),
- "-m", "comment", "--comment", svcNameString,
- "-m", "recent", "--name", string(endpointChain),
- "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
- "-j", string(endpointChain))
- }
- }
- // Setup probability filter rules only over local endpoints
- for i, endpointChain := range localEndpointChains {
- // Balancing rules in the per-service chain.
- args = append(args[:0],
- "-A", string(svcXlbChain),
- "-m", "comment", "--comment",
- fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString),
- )
- if i < (numLocalEndpoints - 1) {
- // Each rule is a probabilistic match.
- args = append(args,
- "-m", "statistic",
- "--mode", "random",
- "--probability", proxier.probability(numLocalEndpoints-i))
- }
- // The final (or only if n == 1) rule is a guaranteed match.
- args = append(args, "-j", string(endpointChain))
- writeLine(proxier.natRules, args...)
- }
- }
- }
- // Delete chains no longer in use.
- for chain := range existingNATChains {
- if !activeNATChains[chain] {
- chainString := string(chain)
- if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
- // Ignore chains that aren't ours.
- continue
- }
- // We must (as per iptables) write a chain-line for it, which has
- // the nice effect of flushing the chain. Then we can remove the
- // chain.
- writeBytesLine(proxier.natChains, existingNATChains[chain])
- writeLine(proxier.natRules, "-X", chainString)
- }
- }
- // Finally, tail-call to the nodeports chain. This needs to be after all
- // other service portal rules.
- addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
- if err != nil {
- klog.Errorf("Failed to get node ip address matching nodeport cidr")
- } else {
- isIPv6 := proxier.iptables.IsIpv6()
- for address := range addresses {
- // TODO(thockin, m1093782566): If/when we have dual-stack support we will want to distinguish v4 from v6 zero-CIDRs.
- if utilproxy.IsZeroCIDR(address) {
- args = append(args[:0],
- "-A", string(kubeServicesChain),
- "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
- "-m", "addrtype", "--dst-type", "LOCAL",
- "-j", string(kubeNodePortsChain))
- writeLine(proxier.natRules, args...)
- // Nothing else matters after the zero CIDR.
- break
- }
- // Ignore IP addresses with incorrect version
- if isIPv6 && !utilnet.IsIPv6String(address) || !isIPv6 && utilnet.IsIPv6String(address) {
- klog.Errorf("IP address %s has incorrect IP version", address)
- continue
- }
- // create nodeport rules for each IP one by one
- args = append(args[:0],
- "-A", string(kubeServicesChain),
- "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
- "-d", address,
- "-j", string(kubeNodePortsChain))
- writeLine(proxier.natRules, args...)
- }
- }
- // Drop the packets in INVALID state, which would potentially cause
- // unexpected connection reset.
- // https://github.com/kubernetes/kubernetes/issues/74839
- writeLine(proxier.filterRules,
- "-A", string(kubeForwardChain),
- "-m", "conntrack",
- "--ctstate", "INVALID",
- "-j", "DROP",
- )
- // If the masqueradeMark has been added then we want to forward that same
- // traffic, this allows NodePort traffic to be forwarded even if the default
- // FORWARD policy is not accept.
- writeLine(proxier.filterRules,
- "-A", string(kubeForwardChain),
- "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
- "-m", "mark", "--mark", proxier.masqueradeMark,
- "-j", "ACCEPT",
- )
- // The following rules can only be set if clusterCIDR has been defined.
- if len(proxier.clusterCIDR) != 0 {
- // The following two rules ensure the traffic after the initial packet
- // accepted by the "kubernetes forwarding rules" rule above will be
- // accepted, to be as specific as possible the traffic must be sourced
- // or destined to the clusterCIDR (to/from a pod).
- writeLine(proxier.filterRules,
- "-A", string(kubeForwardChain),
- "-s", proxier.clusterCIDR,
- "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
- "-m", "conntrack",
- "--ctstate", "RELATED,ESTABLISHED",
- "-j", "ACCEPT",
- )
- writeLine(proxier.filterRules,
- "-A", string(kubeForwardChain),
- "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
- "-d", proxier.clusterCIDR,
- "-m", "conntrack",
- "--ctstate", "RELATED,ESTABLISHED",
- "-j", "ACCEPT",
- )
- }
- // Write the end-of-table markers.
- writeLine(proxier.filterRules, "COMMIT")
- writeLine(proxier.natRules, "COMMIT")
- // Sync rules.
- // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table
- proxier.iptablesData.Reset()
- proxier.iptablesData.Write(proxier.filterChains.Bytes())
- proxier.iptablesData.Write(proxier.filterRules.Bytes())
- proxier.iptablesData.Write(proxier.natChains.Bytes())
- proxier.iptablesData.Write(proxier.natRules.Bytes())
- klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
- err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
- if err != nil {
- klog.Errorf("Failed to execute iptables-restore: %v", err)
- metrics.IptablesRestoreFailuresTotal.Inc()
- // Revert new local ports.
- klog.V(2).Infof("Closing local ports after iptables-restore failure")
- utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
- return
- }
- success = true
- for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
- for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
- latency := metrics.SinceInSeconds(lastChangeTriggerTime)
- metrics.NetworkProgrammingLatency.Observe(latency)
- klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
- }
- }
- // Close old local ports and save new ones.
- for k, v := range proxier.portsMap {
- if replacementPortsMap[k] == nil {
- v.Close()
- }
- }
- proxier.portsMap = replacementPortsMap
- if proxier.healthzServer != nil {
- proxier.healthzServer.Updated()
- }
- metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
- // Update service healthchecks. The endpoints list might include services that are
- // not "OnlyLocal", but the services list will not, and the serviceHealthServer
- // will just drop those endpoints.
- if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
- klog.Errorf("Error syncing healthcheck services: %v", err)
- }
- if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
- klog.Errorf("Error syncing healthcheck endpoints: %v", err)
- }
- // Finish housekeeping.
- // TODO: these could be made more consistent.
- for _, svcIP := range staleServices.UnsortedList() {
- if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
- klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
- }
- }
- proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
- }
- // Join all words with spaces, terminate with newline and write to buf.
- func writeLine(buf *bytes.Buffer, words ...string) {
- // We avoid strings.Join for performance reasons.
- for i := range words {
- buf.WriteString(words[i])
- if i < len(words)-1 {
- buf.WriteByte(' ')
- } else {
- buf.WriteByte('\n')
- }
- }
- }
- func writeBytesLine(buf *bytes.Buffer, bytes []byte) {
- buf.Write(bytes)
- buf.WriteByte('\n')
- }
- func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
- // For ports on node IPs, open the actual port and hold it, even though we
- // use iptables to redirect traffic.
- // This ensures a) that it's safe to use that port and b) that (a) stays
- // true. The risk is that some process on the node (e.g. sshd or kubelet)
- // is using a port and we give that same port out to a Service. That would
- // be bad because iptables would silently claim the traffic but the process
- // would never know.
- // NOTE: We should not need to have a real listen()ing socket - bind()
- // should be enough, but I can't figure out a way to e2e test without
- // it. Tools like 'ss' and 'netstat' do not show sockets that are
- // bind()ed but not listen()ed, and at least the default debian netcat
- // has no way to avoid about 10 seconds of retries.
- var socket utilproxy.Closeable
- switch lp.Protocol {
- case "tcp":
- listener, err := net.Listen("tcp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
- if err != nil {
- return nil, err
- }
- socket = listener
- case "udp":
- addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
- if err != nil {
- return nil, err
- }
- conn, err := net.ListenUDP("udp", addr)
- if err != nil {
- return nil, err
- }
- socket = conn
- default:
- return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
- }
- klog.V(2).Infof("Opened local port %s", lp.String())
- return socket, nil
- }
|