proxier.go 49 KB


  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package userspace
  14. import (
  15. "fmt"
  16. "net"
  17. "reflect"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. v1 "k8s.io/api/core/v1"
  24. "k8s.io/apimachinery/pkg/types"
  25. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  26. utilnet "k8s.io/apimachinery/pkg/util/net"
  27. "k8s.io/apimachinery/pkg/util/runtime"
  28. "k8s.io/apimachinery/pkg/util/sets"
  29. servicehelper "k8s.io/cloud-provider/service/helpers"
  30. "k8s.io/klog"
  31. "k8s.io/kubernetes/pkg/proxy"
  32. "k8s.io/kubernetes/pkg/proxy/config"
  33. utilproxy "k8s.io/kubernetes/pkg/proxy/util"
  34. "k8s.io/kubernetes/pkg/util/async"
  35. "k8s.io/kubernetes/pkg/util/conntrack"
  36. "k8s.io/kubernetes/pkg/util/iptables"
  37. utilexec "k8s.io/utils/exec"
  38. netutils "k8s.io/utils/net"
  39. )
  40. type portal struct {
  41. ip net.IP
  42. port int
  43. isExternal bool
  44. }
  45. // ServiceInfo contains information and state for a particular proxied service
  46. type ServiceInfo struct {
  47. // Timeout is the read/write timeout (used for UDP connections)
  48. Timeout time.Duration
  49. // ActiveClients is the cache of active UDP clients being proxied by this proxy for this service
  50. ActiveClients *ClientCache
  51. isAliveAtomic int32 // Only access this with atomic ops
  52. portal portal
  53. protocol v1.Protocol
  54. proxyPort int
  55. socket ProxySocket
  56. nodePort int
  57. loadBalancerStatus v1.LoadBalancerStatus
  58. sessionAffinityType v1.ServiceAffinity
  59. stickyMaxAgeSeconds int
  60. // Deprecated, but required for back-compat (including e2e)
  61. externalIPs []string
  62. }
  63. func (info *ServiceInfo) setAlive(b bool) {
  64. var i int32
  65. if b {
  66. i = 1
  67. }
  68. atomic.StoreInt32(&info.isAliveAtomic, i)
  69. }
  70. func (info *ServiceInfo) IsAlive() bool {
  71. return atomic.LoadInt32(&info.isAliveAtomic) != 0
  72. }
  73. func logTimeout(err error) bool {
  74. if e, ok := err.(net.Error); ok {
  75. if e.Timeout() {
  76. klog.V(3).Infof("connection to endpoint closed due to inactivity")
  77. return true
  78. }
  79. }
  80. return false
  81. }
  82. // ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port
  83. type ProxySocketFunc func(protocol v1.Protocol, ip net.IP, port int) (ProxySocket, error)
  84. const numBurstSyncs int = 2
  85. type serviceChange struct {
  86. current *v1.Service
  87. previous *v1.Service
  88. }
  89. // Interface for async runner; abstracted for testing
  90. type asyncRunnerInterface interface {
  91. Run()
  92. Loop(<-chan struct{})
  93. }
  94. // Proxier is a simple proxy for TCP connections between a localhost:lport
  95. // and services that provide the actual implementations.
  96. type Proxier struct {
  97. // EndpointSlice support has not been added for this proxier yet.
  98. config.NoopEndpointSliceHandler
  99. // TODO(imroc): implement node handler for userspace proxier.
  100. config.NoopNodeHandler
  101. loadBalancer LoadBalancer
  102. mu sync.Mutex // protects serviceMap
  103. serviceMap map[proxy.ServicePortName]*ServiceInfo
  104. syncPeriod time.Duration
  105. minSyncPeriod time.Duration
  106. udpIdleTimeout time.Duration
  107. portMapMutex sync.Mutex
  108. portMap map[portMapKey]*portMapValue
  109. numProxyLoops int32 // use atomic ops to access this; mostly for testing
  110. listenIP net.IP
  111. iptables iptables.Interface
  112. hostIP net.IP
  113. localAddrs netutils.IPSet
  114. proxyPorts PortAllocator
  115. makeProxySocket ProxySocketFunc
  116. exec utilexec.Interface
  117. // endpointsSynced and servicesSynced are set to 1 when the corresponding
  118. // objects are synced after startup. This is used to avoid updating iptables
  119. // with some partial data after kube-proxy restart.
  120. endpointsSynced int32
  121. servicesSynced int32
  122. initialized int32
  123. // protects serviceChanges
  124. serviceChangesLock sync.Mutex
  125. serviceChanges map[types.NamespacedName]*serviceChange // map of service changes
  126. syncRunner asyncRunnerInterface // governs calls to syncProxyRules
  127. stopChan chan struct{}
  128. }
  129. // assert Proxier is a proxy.Provider
  130. var _ proxy.Provider = &Proxier{}
  131. // A key for the portMap. The ip has to be a string because slices can't be map
  132. // keys.
  133. type portMapKey struct {
  134. ip string
  135. port int
  136. protocol v1.Protocol
  137. }
  138. func (k *portMapKey) String() string {
  139. return fmt.Sprintf("%s/%s", net.JoinHostPort(k.ip, strconv.Itoa(k.port)), k.protocol)
  140. }
  141. // A value for the portMap
  142. type portMapValue struct {
  143. owner proxy.ServicePortName
  144. socket interface {
  145. Close() error
  146. }
  147. }
  148. var (
  149. // ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on
  150. // the loopback address. May be checked for by callers of NewProxier to know whether
  151. // the caller provided invalid input.
  152. ErrProxyOnLocalhost = fmt.Errorf("cannot proxy on localhost")
  153. )
  154. // NewProxier returns a new Proxier given a LoadBalancer and an address on
  155. // which to listen. Because of the iptables logic, It is assumed that there
  156. // is only a single Proxier active on a machine. An error will be returned if
  157. // the proxier cannot be started due to an invalid ListenIP (loopback) or
  158. // if iptables fails to update or acquire the initial lock. Once a proxier is
  159. // created, it will keep iptables up to date in the background and will not
  160. // terminate if a particular iptables call fails.
  161. func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, nodePortAddresses []string) (*Proxier, error) {
  162. return NewCustomProxier(loadBalancer, listenIP, iptables, exec, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, nodePortAddresses, newProxySocket)
  163. }
  164. // NewCustomProxier functions similarly to NewProxier, returning a new Proxier
  165. // for the given LoadBalancer and address. The new proxier is constructed using
  166. // the ProxySocket constructor provided, however, instead of constructing the
  167. // default ProxySockets.
  168. func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, nodePortAddresses []string, makeProxySocket ProxySocketFunc) (*Proxier, error) {
  169. if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
  170. return nil, ErrProxyOnLocalhost
  171. }
  172. // If listenIP is given, assume that is the intended host IP. Otherwise
  173. // try to find a suitable host IP address from network interfaces.
  174. var err error
  175. hostIP := listenIP
  176. if hostIP.Equal(net.IPv4zero) || hostIP.Equal(net.IPv6zero) {
  177. hostIP, err = utilnet.ChooseHostInterface()
  178. if err != nil {
  179. return nil, fmt.Errorf("failed to select a host interface: %v", err)
  180. }
  181. }
  182. err = setRLimit(64 * 1000)
  183. if err != nil {
  184. return nil, fmt.Errorf("failed to set open file handler limit: %v", err)
  185. }
  186. proxyPorts := newPortAllocator(pr)
  187. klog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
  188. return createProxier(loadBalancer, listenIP, iptables, exec, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket)
  189. }
  190. func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
  191. // convenient to pass nil for tests..
  192. if proxyPorts == nil {
  193. proxyPorts = newPortAllocator(utilnet.PortRange{})
  194. }
  195. // Set up the iptables foundations we need.
  196. if err := iptablesInit(iptables); err != nil {
  197. return nil, fmt.Errorf("failed to initialize iptables: %v", err)
  198. }
  199. // Flush old iptables rules (since the bound ports will be invalid after a restart).
  200. // When OnUpdate() is first called, the rules will be recreated.
  201. if err := iptablesFlush(iptables); err != nil {
  202. return nil, fmt.Errorf("failed to flush iptables: %v", err)
  203. }
  204. proxier := &Proxier{
  205. loadBalancer: loadBalancer,
  206. serviceMap: make(map[proxy.ServicePortName]*ServiceInfo),
  207. serviceChanges: make(map[types.NamespacedName]*serviceChange),
  208. portMap: make(map[portMapKey]*portMapValue),
  209. syncPeriod: syncPeriod,
  210. minSyncPeriod: minSyncPeriod,
  211. udpIdleTimeout: udpIdleTimeout,
  212. listenIP: listenIP,
  213. iptables: iptables,
  214. hostIP: hostIP,
  215. proxyPorts: proxyPorts,
  216. makeProxySocket: makeProxySocket,
  217. exec: exec,
  218. stopChan: make(chan struct{}),
  219. }
  220. klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, numBurstSyncs)
  221. proxier.syncRunner = async.NewBoundedFrequencyRunner("userspace-proxy-sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, numBurstSyncs)
  222. return proxier, nil
  223. }
  224. // CleanupLeftovers removes all iptables rules and chains created by the Proxier
  225. // It returns true if an error was encountered. Errors are logged.
  226. func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) {
  227. // NOTE: Warning, this needs to be kept in sync with the userspace Proxier,
  228. // we want to ensure we remove all of the iptables rules it creates.
  229. // Currently they are all in iptablesInit()
  230. // Delete Rules first, then Flush and Delete Chains
  231. args := []string{"-m", "comment", "--comment", "handle ClusterIPs; NOTE: this must be before the NodePort rules"}
  232. if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostPortalChain))...); err != nil {
  233. if !iptables.IsNotFoundError(err) {
  234. klog.Errorf("Error removing userspace rule: %v", err)
  235. encounteredError = true
  236. }
  237. }
  238. if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerPortalChain))...); err != nil {
  239. if !iptables.IsNotFoundError(err) {
  240. klog.Errorf("Error removing userspace rule: %v", err)
  241. encounteredError = true
  242. }
  243. }
  244. args = []string{"-m", "addrtype", "--dst-type", "LOCAL"}
  245. args = append(args, "-m", "comment", "--comment", "handle service NodePorts; NOTE: this must be the last rule in the chain")
  246. if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostNodePortChain))...); err != nil {
  247. if !iptables.IsNotFoundError(err) {
  248. klog.Errorf("Error removing userspace rule: %v", err)
  249. encounteredError = true
  250. }
  251. }
  252. if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerNodePortChain))...); err != nil {
  253. if !iptables.IsNotFoundError(err) {
  254. klog.Errorf("Error removing userspace rule: %v", err)
  255. encounteredError = true
  256. }
  257. }
  258. args = []string{"-m", "comment", "--comment", "Ensure that non-local NodePort traffic can flow"}
  259. if err := ipt.DeleteRule(iptables.TableFilter, iptables.ChainInput, append(args, "-j", string(iptablesNonLocalNodePortChain))...); err != nil {
  260. if !iptables.IsNotFoundError(err) {
  261. klog.Errorf("Error removing userspace rule: %v", err)
  262. encounteredError = true
  263. }
  264. }
  265. // flush and delete chains.
  266. tableChains := map[iptables.Table][]iptables.Chain{
  267. iptables.TableNAT: {iptablesContainerPortalChain, iptablesHostPortalChain, iptablesHostNodePortChain, iptablesContainerNodePortChain},
  268. iptables.TableFilter: {iptablesNonLocalNodePortChain},
  269. }
  270. for table, chains := range tableChains {
  271. for _, c := range chains {
  272. // flush chain, then if successful delete, delete will fail if flush fails.
  273. if err := ipt.FlushChain(table, c); err != nil {
  274. if !iptables.IsNotFoundError(err) {
  275. klog.Errorf("Error flushing userspace chain: %v", err)
  276. encounteredError = true
  277. }
  278. } else {
  279. if err = ipt.DeleteChain(table, c); err != nil {
  280. if !iptables.IsNotFoundError(err) {
  281. klog.Errorf("Error deleting userspace chain: %v", err)
  282. encounteredError = true
  283. }
  284. }
  285. }
  286. }
  287. }
  288. return encounteredError
  289. }
  290. // shutdown closes all service port proxies and returns from the proxy's
  291. // sync loop. Used from testcases.
  292. func (proxier *Proxier) shutdown() {
  293. proxier.mu.Lock()
  294. defer proxier.mu.Unlock()
  295. for serviceName, info := range proxier.serviceMap {
  296. proxier.stopProxy(serviceName, info)
  297. }
  298. proxier.cleanupStaleStickySessions()
  299. close(proxier.stopChan)
  300. }
  301. func (proxier *Proxier) isInitialized() bool {
  302. return atomic.LoadInt32(&proxier.initialized) > 0
  303. }
  304. // Sync is called to synchronize the proxier state to iptables as soon as possible.
  305. func (proxier *Proxier) Sync() {
  306. proxier.syncRunner.Run()
  307. }
  308. func (proxier *Proxier) syncProxyRules() {
  309. start := time.Now()
  310. defer func() {
  311. klog.V(2).Infof("userspace syncProxyRules took %v", time.Since(start))
  312. }()
  313. // don't sync rules till we've received services and endpoints
  314. if !proxier.isInitialized() {
  315. klog.V(2).Info("Not syncing userspace proxy until Services and Endpoints have been received from master")
  316. return
  317. }
  318. if err := iptablesInit(proxier.iptables); err != nil {
  319. klog.Errorf("Failed to ensure iptables: %v", err)
  320. }
  321. proxier.serviceChangesLock.Lock()
  322. changes := proxier.serviceChanges
  323. proxier.serviceChanges = make(map[types.NamespacedName]*serviceChange)
  324. proxier.serviceChangesLock.Unlock()
  325. proxier.mu.Lock()
  326. defer proxier.mu.Unlock()
  327. klog.V(2).Infof("userspace proxy: processing %d service events", len(changes))
  328. for _, change := range changes {
  329. existingPorts := proxier.mergeService(change.current)
  330. proxier.unmergeService(change.previous, existingPorts)
  331. }
  332. localAddrs, err := utilproxy.GetLocalAddrs()
  333. if err != nil {
  334. klog.Errorf("Failed to get local addresses during proxy sync: %s, assuming IPs are not local", err)
  335. } else if len(localAddrs) == 0 {
  336. klog.Warning("No local addresses were found, assuming all external IPs are not local")
  337. }
  338. localAddrSet := netutils.IPSet{}
  339. localAddrSet.Insert(localAddrs...)
  340. proxier.localAddrs = localAddrSet
  341. proxier.ensurePortals()
  342. proxier.cleanupStaleStickySessions()
  343. }
  344. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
  345. func (proxier *Proxier) SyncLoop() {
  346. proxier.syncRunner.Loop(proxier.stopChan)
  347. }
  348. // Ensure that portals exist for all services.
  349. func (proxier *Proxier) ensurePortals() {
  350. // NB: This does not remove rules that should not be present.
  351. for name, info := range proxier.serviceMap {
  352. err := proxier.openPortal(name, info)
  353. if err != nil {
  354. klog.Errorf("Failed to ensure portal for %q: %v", name, err)
  355. }
  356. }
  357. }
  358. // clean up any stale sticky session records in the hash map.
  359. func (proxier *Proxier) cleanupStaleStickySessions() {
  360. for name := range proxier.serviceMap {
  361. proxier.loadBalancer.CleanupStaleStickySessions(name)
  362. }
  363. }
  364. func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *ServiceInfo) error {
  365. delete(proxier.serviceMap, service)
  366. info.setAlive(false)
  367. err := info.socket.Close()
  368. port := info.socket.ListenPort()
  369. proxier.proxyPorts.Release(port)
  370. return err
  371. }
  372. func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceInfo, bool) {
  373. proxier.mu.Lock()
  374. defer proxier.mu.Unlock()
  375. info, ok := proxier.serviceMap[service]
  376. return info, ok
  377. }
  378. // addServiceOnPort lockes the proxy before calling addServiceOnPortInternal.
  379. // Used from testcases.
  380. func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
  381. proxier.mu.Lock()
  382. defer proxier.mu.Unlock()
  383. return proxier.addServiceOnPortInternal(service, protocol, proxyPort, timeout)
  384. }
  385. // addServiceOnPortInternal starts listening for a new service, returning the ServiceInfo.
  386. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
  387. // connections, for now.
  388. func (proxier *Proxier) addServiceOnPortInternal(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
  389. sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort)
  390. if err != nil {
  391. return nil, err
  392. }
  393. _, portStr, err := net.SplitHostPort(sock.Addr().String())
  394. if err != nil {
  395. sock.Close()
  396. return nil, err
  397. }
  398. portNum, err := strconv.Atoi(portStr)
  399. if err != nil {
  400. sock.Close()
  401. return nil, err
  402. }
  403. si := &ServiceInfo{
  404. Timeout: timeout,
  405. ActiveClients: newClientCache(),
  406. isAliveAtomic: 1,
  407. proxyPort: portNum,
  408. protocol: protocol,
  409. socket: sock,
  410. sessionAffinityType: v1.ServiceAffinityNone, // default
  411. }
  412. proxier.serviceMap[service] = si
  413. klog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
  414. go func(service proxy.ServicePortName, proxier *Proxier) {
  415. defer runtime.HandleCrash()
  416. atomic.AddInt32(&proxier.numProxyLoops, 1)
  417. sock.ProxyLoop(service, si, proxier.loadBalancer)
  418. atomic.AddInt32(&proxier.numProxyLoops, -1)
  419. }(service, proxier)
  420. return si, nil
  421. }
  422. func (proxier *Proxier) cleanupPortalAndProxy(serviceName proxy.ServicePortName, info *ServiceInfo) error {
  423. if err := proxier.closePortal(serviceName, info); err != nil {
  424. return fmt.Errorf("Failed to close portal for %q: %v", serviceName, err)
  425. }
  426. if err := proxier.stopProxy(serviceName, info); err != nil {
  427. return fmt.Errorf("Failed to stop service %q: %v", serviceName, err)
  428. }
  429. return nil
  430. }
  431. func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
  432. if service == nil {
  433. return nil
  434. }
  435. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  436. if utilproxy.ShouldSkipService(svcName, service) {
  437. klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
  438. return nil
  439. }
  440. existingPorts := sets.NewString()
  441. for i := range service.Spec.Ports {
  442. servicePort := &service.Spec.Ports[i]
  443. serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
  444. existingPorts.Insert(servicePort.Name)
  445. info, exists := proxier.serviceMap[serviceName]
  446. // TODO: check health of the socket? What if ProxyLoop exited?
  447. if exists && sameConfig(info, service, servicePort) {
  448. // Nothing changed.
  449. continue
  450. }
  451. if exists {
  452. klog.V(4).Infof("Something changed for service %q: stopping it", serviceName)
  453. if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil {
  454. klog.Error(err)
  455. }
  456. }
  457. proxyPort, err := proxier.proxyPorts.AllocateNext()
  458. if err != nil {
  459. klog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err)
  460. continue
  461. }
  462. serviceIP := net.ParseIP(service.Spec.ClusterIP)
  463. klog.V(1).Infof("Adding new service %q at %s/%s", serviceName, net.JoinHostPort(serviceIP.String(), strconv.Itoa(int(servicePort.Port))), servicePort.Protocol)
  464. info, err = proxier.addServiceOnPortInternal(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
  465. if err != nil {
  466. klog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
  467. continue
  468. }
  469. info.portal.ip = serviceIP
  470. info.portal.port = int(servicePort.Port)
  471. info.externalIPs = service.Spec.ExternalIPs
  472. // Deep-copy in case the service instance changes
  473. info.loadBalancerStatus = *service.Status.LoadBalancer.DeepCopy()
  474. info.nodePort = int(servicePort.NodePort)
  475. info.sessionAffinityType = service.Spec.SessionAffinity
  476. // Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP
  477. if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
  478. info.stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
  479. }
  480. klog.V(4).Infof("info: %#v", info)
  481. if err := proxier.openPortal(serviceName, info); err != nil {
  482. klog.Errorf("Failed to open portal for %q: %v", serviceName, err)
  483. }
  484. proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeSeconds)
  485. }
  486. return existingPorts
  487. }
  488. func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.String) {
  489. if service == nil {
  490. return
  491. }
  492. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  493. if utilproxy.ShouldSkipService(svcName, service) {
  494. klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
  495. return
  496. }
  497. staleUDPServices := sets.NewString()
  498. for i := range service.Spec.Ports {
  499. servicePort := &service.Spec.Ports[i]
  500. if existingPorts.Has(servicePort.Name) {
  501. continue
  502. }
  503. serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
  504. klog.V(1).Infof("Stopping service %q", serviceName)
  505. info, exists := proxier.serviceMap[serviceName]
  506. if !exists {
  507. klog.Errorf("Service %q is being removed but doesn't exist", serviceName)
  508. continue
  509. }
  510. if proxier.serviceMap[serviceName].protocol == v1.ProtocolUDP {
  511. staleUDPServices.Insert(proxier.serviceMap[serviceName].portal.ip.String())
  512. }
  513. if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil {
  514. klog.Error(err)
  515. }
  516. proxier.loadBalancer.DeleteService(serviceName)
  517. }
  518. for _, svcIP := range staleUDPServices.UnsortedList() {
  519. if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
  520. klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
  521. }
  522. }
  523. }
  524. func (proxier *Proxier) serviceChange(previous, current *v1.Service, detail string) {
  525. var svcName types.NamespacedName
  526. if current != nil {
  527. svcName = types.NamespacedName{Namespace: current.Namespace, Name: current.Name}
  528. } else {
  529. svcName = types.NamespacedName{Namespace: previous.Namespace, Name: previous.Name}
  530. }
  531. klog.V(4).Infof("userspace proxy: %s for %s", detail, svcName)
  532. proxier.serviceChangesLock.Lock()
  533. defer proxier.serviceChangesLock.Unlock()
  534. change, exists := proxier.serviceChanges[svcName]
  535. if !exists {
  536. // change.previous is only set for new changes. We must keep
  537. // the oldest service info (or nil) because correct unmerging
  538. // depends on the next update/del after a merge, not subsequent
  539. // updates.
  540. change = &serviceChange{previous: previous}
  541. proxier.serviceChanges[svcName] = change
  542. }
  543. // Always use the most current service (or nil) as change.current
  544. change.current = current
  545. if reflect.DeepEqual(change.previous, change.current) {
  546. // collapsed change had no effect
  547. delete(proxier.serviceChanges, svcName)
  548. } else if proxier.isInitialized() {
  549. // change will have an effect, ask the proxy to sync
  550. proxier.syncRunner.Run()
  551. }
  552. }
  553. func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  554. proxier.serviceChange(nil, service, "OnServiceAdd")
  555. }
  556. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  557. proxier.serviceChange(oldService, service, "OnServiceUpdate")
  558. }
  559. func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  560. proxier.serviceChange(service, nil, "OnServiceDelete")
  561. }
  562. func (proxier *Proxier) OnServiceSynced() {
  563. klog.V(2).Infof("userspace OnServiceSynced")
  564. // Mark services as initialized and (if endpoints are already
  565. // initialized) the entire proxy as initialized
  566. atomic.StoreInt32(&proxier.servicesSynced, 1)
  567. if atomic.LoadInt32(&proxier.endpointsSynced) > 0 {
  568. atomic.StoreInt32(&proxier.initialized, 1)
  569. }
  570. // Must sync from a goroutine to avoid blocking the
  571. // service event handler on startup with large numbers
  572. // of initial objects
  573. go proxier.syncProxyRules()
  574. }
  575. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
  576. proxier.loadBalancer.OnEndpointsAdd(endpoints)
  577. }
  578. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  579. proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints)
  580. }
  581. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
  582. proxier.loadBalancer.OnEndpointsDelete(endpoints)
  583. }
  584. func (proxier *Proxier) OnEndpointsSynced() {
  585. klog.V(2).Infof("userspace OnEndpointsSynced")
  586. proxier.loadBalancer.OnEndpointsSynced()
  587. // Mark endpoints as initialized and (if services are already
  588. // initialized) the entire proxy as initialized
  589. atomic.StoreInt32(&proxier.endpointsSynced, 1)
  590. if atomic.LoadInt32(&proxier.servicesSynced) > 0 {
  591. atomic.StoreInt32(&proxier.initialized, 1)
  592. }
  593. // Must sync from a goroutine to avoid blocking the
  594. // service event handler on startup with large numbers
  595. // of initial objects
  596. go proxier.syncProxyRules()
  597. }
  598. func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool {
  599. if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) {
  600. return false
  601. }
  602. if !info.portal.ip.Equal(net.ParseIP(service.Spec.ClusterIP)) {
  603. return false
  604. }
  605. if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) {
  606. return false
  607. }
  608. if !servicehelper.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) {
  609. return false
  610. }
  611. if info.sessionAffinityType != service.Spec.SessionAffinity {
  612. return false
  613. }
  614. return true
  615. }
  616. func ipsEqual(lhs, rhs []string) bool {
  617. if len(lhs) != len(rhs) {
  618. return false
  619. }
  620. for i := range lhs {
  621. if lhs[i] != rhs[i] {
  622. return false
  623. }
  624. }
  625. return true
  626. }
  627. func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *ServiceInfo) error {
  628. err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)
  629. if err != nil {
  630. return err
  631. }
  632. for _, publicIP := range info.externalIPs {
  633. err = proxier.openOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service)
  634. if err != nil {
  635. return err
  636. }
  637. }
  638. for _, ingress := range info.loadBalancerStatus.Ingress {
  639. if ingress.IP != "" {
  640. err = proxier.openOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service)
  641. if err != nil {
  642. return err
  643. }
  644. }
  645. }
  646. if info.nodePort != 0 {
  647. err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)
  648. if err != nil {
  649. return err
  650. }
  651. }
  652. return nil
  653. }
  654. func (proxier *Proxier) openOnePortal(portal portal, protocol v1.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {
  655. if proxier.localAddrs.Len() > 0 && proxier.localAddrs.Has(portal.ip) {
  656. err := proxier.claimNodePort(portal.ip, portal.port, protocol, name)
  657. if err != nil {
  658. return err
  659. }
  660. }
  661. // Handle traffic from containers.
  662. args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)
  663. portalAddress := net.JoinHostPort(portal.ip.String(), strconv.Itoa(portal.port))
  664. existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
  665. if err != nil {
  666. klog.Errorf("Failed to install iptables %s rule for service %q, args:%v", iptablesContainerPortalChain, name, args)
  667. return err
  668. }
  669. if !existed {
  670. klog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s", name, protocol, portalAddress)
  671. }
  672. if portal.isExternal {
  673. args := proxier.iptablesContainerPortalArgs(portal.ip, false, true, portal.port, protocol, proxyIP, proxyPort, name)
  674. existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
  675. if err != nil {
  676. klog.Errorf("Failed to install iptables %s rule that opens service %q for local traffic, args:%v", iptablesContainerPortalChain, name, args)
  677. return err
  678. }
  679. if !existed {
  680. klog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s for local traffic", name, protocol, portalAddress)
  681. }
  682. args = proxier.iptablesHostPortalArgs(portal.ip, true, portal.port, protocol, proxyIP, proxyPort, name)
  683. existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)
  684. if err != nil {
  685. klog.Errorf("Failed to install iptables %s rule for service %q for dst-local traffic", iptablesHostPortalChain, name)
  686. return err
  687. }
  688. if !existed {
  689. klog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s for dst-local traffic", name, protocol, portalAddress)
  690. }
  691. return nil
  692. }
  693. // Handle traffic from the host.
  694. args = proxier.iptablesHostPortalArgs(portal.ip, false, portal.port, protocol, proxyIP, proxyPort, name)
  695. existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)
  696. if err != nil {
  697. klog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostPortalChain, name)
  698. return err
  699. }
  700. if !existed {
  701. klog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s", name, protocol, portalAddress)
  702. }
  703. return nil
  704. }
  705. // Marks a port as being owned by a particular service, or returns error if already claimed.
  706. // Idempotent: reclaiming with the same owner is not an error
  707. func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol v1.Protocol, owner proxy.ServicePortName) error {
  708. proxier.portMapMutex.Lock()
  709. defer proxier.portMapMutex.Unlock()
  710. // TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known ports
  711. key := portMapKey{ip: ip.String(), port: port, protocol: protocol}
  712. existing, found := proxier.portMap[key]
  713. if !found {
  714. // Hold the actual port open, even though we use iptables to redirect
  715. // it. This ensures that a) it's safe to take and b) that stays true.
  716. // NOTE: We should not need to have a real listen()ing socket - bind()
  717. // should be enough, but I can't figure out a way to e2e test without
  718. // it. Tools like 'ss' and 'netstat' do not show sockets that are
  719. // bind()ed but not listen()ed, and at least the default debian netcat
  720. // has no way to avoid about 10 seconds of retries.
  721. socket, err := proxier.makeProxySocket(protocol, ip, port)
  722. if err != nil {
  723. return fmt.Errorf("can't open node port for %s: %v", key.String(), err)
  724. }
  725. proxier.portMap[key] = &portMapValue{owner: owner, socket: socket}
  726. klog.V(2).Infof("Claimed local port %s", key.String())
  727. return nil
  728. }
  729. if existing.owner == owner {
  730. // We are idempotent
  731. return nil
  732. }
  733. return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing)
  734. }
  735. // Release a claim on a port. Returns an error if the owner does not match the claim.
  736. // Tolerates release on an unclaimed port, to simplify .
  737. func (proxier *Proxier) releaseNodePort(ip net.IP, port int, protocol v1.Protocol, owner proxy.ServicePortName) error {
  738. proxier.portMapMutex.Lock()
  739. defer proxier.portMapMutex.Unlock()
  740. key := portMapKey{ip: ip.String(), port: port, protocol: protocol}
  741. existing, found := proxier.portMap[key]
  742. if !found {
  743. // We tolerate this, it happens if we are cleaning up a failed allocation
  744. klog.Infof("Ignoring release on unowned port: %v", key)
  745. return nil
  746. }
  747. if existing.owner != owner {
  748. return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing)
  749. }
  750. delete(proxier.portMap, key)
  751. existing.socket.Close()
  752. return nil
  753. }
  754. func (proxier *Proxier) openNodePort(nodePort int, protocol v1.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {
  755. // TODO: Do we want to allow containers to access public services? Probably yes.
  756. // TODO: We could refactor this to be the same code as portal, but with IP == nil
  757. err := proxier.claimNodePort(nil, nodePort, protocol, name)
  758. if err != nil {
  759. return err
  760. }
  761. // Handle traffic from containers.
  762. args := proxier.iptablesContainerPortalArgs(nil, false, false, nodePort, protocol, proxyIP, proxyPort, name)
  763. existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerNodePortChain, args...)
  764. if err != nil {
  765. klog.Errorf("Failed to install iptables %s rule for service %q", iptablesContainerNodePortChain, name)
  766. return err
  767. }
  768. if !existed {
  769. klog.Infof("Opened iptables from-containers public port for service %q on %s port %d", name, protocol, nodePort)
  770. }
  771. // Handle traffic from the host.
  772. args = proxier.iptablesHostNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
  773. existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostNodePortChain, args...)
  774. if err != nil {
  775. klog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostNodePortChain, name)
  776. return err
  777. }
  778. if !existed {
  779. klog.Infof("Opened iptables from-host public port for service %q on %s port %d", name, protocol, nodePort)
  780. }
  781. args = proxier.iptablesNonLocalNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
  782. existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableFilter, iptablesNonLocalNodePortChain, args...)
  783. if err != nil {
  784. klog.Errorf("Failed to install iptables %s rule for service %q", iptablesNonLocalNodePortChain, name)
  785. return err
  786. }
  787. if !existed {
  788. klog.Infof("Opened iptables from-non-local public port for service %q on %s port %d", name, protocol, nodePort)
  789. }
  790. return nil
  791. }
  792. func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *ServiceInfo) error {
  793. // Collect errors and report them all at the end.
  794. el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)
  795. for _, publicIP := range info.externalIPs {
  796. el = append(el, proxier.closeOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service)...)
  797. }
  798. for _, ingress := range info.loadBalancerStatus.Ingress {
  799. if ingress.IP != "" {
  800. el = append(el, proxier.closeOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service)...)
  801. }
  802. }
  803. if info.nodePort != 0 {
  804. el = append(el, proxier.closeNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)...)
  805. }
  806. if len(el) == 0 {
  807. klog.V(3).Infof("Closed iptables portals for service %q", service)
  808. } else {
  809. klog.Errorf("Some errors closing iptables portals for service %q", service)
  810. }
  811. return utilerrors.NewAggregate(el)
  812. }
  813. func (proxier *Proxier) closeOnePortal(portal portal, protocol v1.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error {
  814. el := []error{}
  815. if proxier.localAddrs.Len() > 0 && proxier.localAddrs.Has(portal.ip) {
  816. if err := proxier.releaseNodePort(portal.ip, portal.port, protocol, name); err != nil {
  817. el = append(el, err)
  818. }
  819. }
  820. // Handle traffic from containers.
  821. args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)
  822. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerPortalChain, args...); err != nil {
  823. klog.Errorf("Failed to delete iptables %s rule for service %q", iptablesContainerPortalChain, name)
  824. el = append(el, err)
  825. }
  826. if portal.isExternal {
  827. args := proxier.iptablesContainerPortalArgs(portal.ip, false, true, portal.port, protocol, proxyIP, proxyPort, name)
  828. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerPortalChain, args...); err != nil {
  829. klog.Errorf("Failed to delete iptables %s rule for service %q", iptablesContainerPortalChain, name)
  830. el = append(el, err)
  831. }
  832. args = proxier.iptablesHostPortalArgs(portal.ip, true, portal.port, protocol, proxyIP, proxyPort, name)
  833. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesHostPortalChain, args...); err != nil {
  834. klog.Errorf("Failed to delete iptables %s rule for service %q", iptablesHostPortalChain, name)
  835. el = append(el, err)
  836. }
  837. return el
  838. }
  839. // Handle traffic from the host (portalIP is not external).
  840. args = proxier.iptablesHostPortalArgs(portal.ip, false, portal.port, protocol, proxyIP, proxyPort, name)
  841. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesHostPortalChain, args...); err != nil {
  842. klog.Errorf("Failed to delete iptables %s rule for service %q", iptablesHostPortalChain, name)
  843. el = append(el, err)
  844. }
  845. return el
  846. }
  847. func (proxier *Proxier) closeNodePort(nodePort int, protocol v1.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error {
  848. el := []error{}
  849. // Handle traffic from containers.
  850. args := proxier.iptablesContainerPortalArgs(nil, false, false, nodePort, protocol, proxyIP, proxyPort, name)
  851. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerNodePortChain, args...); err != nil {
  852. klog.Errorf("Failed to delete iptables %s rule for service %q", iptablesContainerNodePortChain, name)
  853. el = append(el, err)
  854. }
  855. // Handle traffic from the host.
  856. args = proxier.iptablesHostNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
  857. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesHostNodePortChain, args...); err != nil {
  858. klog.Errorf("Failed to delete iptables %s rule for service %q", iptablesHostNodePortChain, name)
  859. el = append(el, err)
  860. }
  861. // Handle traffic not local to the host
  862. args = proxier.iptablesNonLocalNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
  863. if err := proxier.iptables.DeleteRule(iptables.TableFilter, iptablesNonLocalNodePortChain, args...); err != nil {
  864. klog.Errorf("Failed to delete iptables %s rule for service %q", iptablesNonLocalNodePortChain, name)
  865. el = append(el, err)
  866. }
  867. if err := proxier.releaseNodePort(nil, nodePort, protocol, name); err != nil {
  868. el = append(el, err)
  869. }
  870. return el
  871. }
  872. // See comments in the *PortalArgs() functions for some details about why we
  873. // use two chains for portals.
  874. var iptablesContainerPortalChain iptables.Chain = "KUBE-PORTALS-CONTAINER"
  875. var iptablesHostPortalChain iptables.Chain = "KUBE-PORTALS-HOST"
  876. // Chains for NodePort services
  877. var iptablesContainerNodePortChain iptables.Chain = "KUBE-NODEPORT-CONTAINER"
  878. var iptablesHostNodePortChain iptables.Chain = "KUBE-NODEPORT-HOST"
  879. var iptablesNonLocalNodePortChain iptables.Chain = "KUBE-NODEPORT-NON-LOCAL"
  880. // Ensure that the iptables infrastructure we use is set up. This can safely be called periodically.
  881. func iptablesInit(ipt iptables.Interface) error {
  882. // TODO: There is almost certainly room for optimization here. E.g. If
  883. // we knew the service-cluster-ip-range CIDR we could fast-track outbound packets not
  884. // destined for a service. There's probably more, help wanted.
  885. // Danger - order of these rules matters here:
  886. //
  887. // We match portal rules first, then NodePort rules. For NodePort rules, we filter primarily on --dst-type LOCAL,
  888. // because we want to listen on all local addresses, but don't match internet traffic with the same dst port number.
  889. //
  890. // There is one complication (per thockin):
  891. // -m addrtype --dst-type LOCAL is what we want except that it is broken (by intent without foresight to our usecase)
  892. // on at least GCE. Specifically, GCE machines have a daemon which learns what external IPs are forwarded to that
  893. // machine, and configure a local route for that IP, making a match for --dst-type LOCAL when we don't want it to.
  894. // Removing the route gives correct behavior until the daemon recreates it.
  895. // Killing the daemon is an option, but means that any non-kubernetes use of the machine with external IP will be broken.
  896. //
  897. // This applies to IPs on GCE that are actually from a load-balancer; they will be categorized as LOCAL.
  898. // _If_ the chains were in the wrong order, and the LB traffic had dst-port == a NodePort on some other service,
  899. // the NodePort would take priority (incorrectly).
  900. // This is unlikely (and would only affect outgoing traffic from the cluster to the load balancer, which seems
  901. // doubly-unlikely), but we need to be careful to keep the rules in the right order.
  902. args := []string{ /* service-cluster-ip-range matching could go here */ }
  903. args = append(args, "-m", "comment", "--comment", "handle ClusterIPs; NOTE: this must be before the NodePort rules")
  904. if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerPortalChain); err != nil {
  905. return err
  906. }
  907. if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerPortalChain))...); err != nil {
  908. return err
  909. }
  910. if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostPortalChain); err != nil {
  911. return err
  912. }
  913. if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostPortalChain))...); err != nil {
  914. return err
  915. }
  916. // This set of rules matches broadly (addrtype & destination port), and therefore must come after the portal rules
  917. args = []string{"-m", "addrtype", "--dst-type", "LOCAL"}
  918. args = append(args, "-m", "comment", "--comment", "handle service NodePorts; NOTE: this must be the last rule in the chain")
  919. if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerNodePortChain); err != nil {
  920. return err
  921. }
  922. if _, err := ipt.EnsureRule(iptables.Append, iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerNodePortChain))...); err != nil {
  923. return err
  924. }
  925. if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostNodePortChain); err != nil {
  926. return err
  927. }
  928. if _, err := ipt.EnsureRule(iptables.Append, iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostNodePortChain))...); err != nil {
  929. return err
  930. }
  931. // Create a chain intended to explicitly allow non-local NodePort
  932. // traffic to work around default-deny iptables configurations
  933. // that would otherwise reject such traffic.
  934. args = []string{"-m", "comment", "--comment", "Ensure that non-local NodePort traffic can flow"}
  935. if _, err := ipt.EnsureChain(iptables.TableFilter, iptablesNonLocalNodePortChain); err != nil {
  936. return err
  937. }
  938. if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableFilter, iptables.ChainInput, append(args, "-j", string(iptablesNonLocalNodePortChain))...); err != nil {
  939. return err
  940. }
  941. // TODO: Verify order of rules.
  942. return nil
  943. }
  944. // Flush all of our custom iptables rules.
  945. func iptablesFlush(ipt iptables.Interface) error {
  946. el := []error{}
  947. if err := ipt.FlushChain(iptables.TableNAT, iptablesContainerPortalChain); err != nil {
  948. el = append(el, err)
  949. }
  950. if err := ipt.FlushChain(iptables.TableNAT, iptablesHostPortalChain); err != nil {
  951. el = append(el, err)
  952. }
  953. if err := ipt.FlushChain(iptables.TableNAT, iptablesContainerNodePortChain); err != nil {
  954. el = append(el, err)
  955. }
  956. if err := ipt.FlushChain(iptables.TableNAT, iptablesHostNodePortChain); err != nil {
  957. el = append(el, err)
  958. }
  959. if err := ipt.FlushChain(iptables.TableFilter, iptablesNonLocalNodePortChain); err != nil {
  960. el = append(el, err)
  961. }
  962. if len(el) != 0 {
  963. klog.Errorf("Some errors flushing old iptables portals: %v", el)
  964. }
  965. return utilerrors.NewAggregate(el)
  966. }
  967. // Used below.
  968. var zeroIPv4 = net.ParseIP("0.0.0.0")
  969. var localhostIPv4 = net.ParseIP("127.0.0.1")
  970. var zeroIPv6 = net.ParseIP("::")
  971. var localhostIPv6 = net.ParseIP("::1")
  972. // Build a slice of iptables args that are common to from-container and from-host portal rules.
  973. func iptablesCommonPortalArgs(destIP net.IP, addPhysicalInterfaceMatch bool, addDstLocalMatch bool, destPort int, protocol v1.Protocol, service proxy.ServicePortName) []string {
  974. // This list needs to include all fields as they are eventually spit out
  975. // by iptables-save. This is because some systems do not support the
  976. // 'iptables -C' arg, and so fall back on parsing iptables-save output.
  977. // If this does not match, it will not pass the check. For example:
  978. // adding the /32 on the destination IP arg is not strictly required,
  979. // but causes this list to not match the final iptables-save output.
  980. // This is fragile and I hope one day we can stop supporting such old
  981. // iptables versions.
  982. args := []string{
  983. "-m", "comment",
  984. "--comment", service.String(),
  985. "-p", strings.ToLower(string(protocol)),
  986. "-m", strings.ToLower(string(protocol)),
  987. "--dport", fmt.Sprintf("%d", destPort),
  988. }
  989. if destIP != nil {
  990. args = append(args, "-d", utilproxy.ToCIDR(destIP))
  991. }
  992. if addPhysicalInterfaceMatch {
  993. args = append(args, "-m", "physdev", "!", "--physdev-is-in")
  994. }
  995. if addDstLocalMatch {
  996. args = append(args, "-m", "addrtype", "--dst-type", "LOCAL")
  997. }
  998. return args
  999. }
  1000. // Build a slice of iptables args for a from-container portal rule.
  1001. func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, addPhysicalInterfaceMatch bool, addDstLocalMatch bool, destPort int, protocol v1.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
  1002. args := iptablesCommonPortalArgs(destIP, addPhysicalInterfaceMatch, addDstLocalMatch, destPort, protocol, service)
  1003. // This is tricky.
  1004. //
  1005. // If the proxy is bound (see Proxier.listenIP) to 0.0.0.0 ("any
  1006. // interface") we want to use REDIRECT, which sends traffic to the
  1007. // "primary address of the incoming interface" which means the container
  1008. // bridge, if there is one. When the response comes, it comes from that
  1009. // same interface, so the NAT matches and the response packet is
  1010. // correct. This matters for UDP, since there is no per-connection port
  1011. // number.
  1012. //
  1013. // The alternative would be to use DNAT, except that it doesn't work
  1014. // (empirically):
  1015. // * DNAT to 127.0.0.1 = Packets just disappear - this seems to be a
  1016. // well-known limitation of iptables.
  1017. // * DNAT to eth0's IP = Response packets come from the bridge, which
  1018. // breaks the NAT, and makes things like DNS not accept them. If
  1019. // this could be resolved, it would simplify all of this code.
  1020. //
  1021. // If the proxy is bound to a specific IP, then we have to use DNAT to
  1022. // that IP. Unlike the previous case, this works because the proxy is
  1023. // ONLY listening on that IP, not the bridge.
  1024. //
  1025. // Why would anyone bind to an address that is not inclusive of
  1026. // localhost? Apparently some cloud environments have their public IP
  1027. // exposed as a real network interface AND do not have firewalling. We
  1028. // don't want to expose everything out to the world.
  1029. //
  1030. // Unfortunately, I don't know of any way to listen on some (N > 1)
  1031. // interfaces but not ALL interfaces, short of doing it manually, and
  1032. // this is simpler than that.
  1033. //
  1034. // If the proxy is bound to localhost only, all of this is broken. Not
  1035. // allowed.
  1036. if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
  1037. // TODO: Can we REDIRECT with IPv6?
  1038. args = append(args, "-j", "REDIRECT", "--to-ports", fmt.Sprintf("%d", proxyPort))
  1039. } else {
  1040. // TODO: Can we DNAT with IPv6?
  1041. args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
  1042. }
  1043. return args
  1044. }
  1045. // Build a slice of iptables args for a from-host portal rule.
  1046. func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, addDstLocalMatch bool, destPort int, protocol v1.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
  1047. args := iptablesCommonPortalArgs(destIP, false, addDstLocalMatch, destPort, protocol, service)
  1048. // This is tricky.
  1049. //
  1050. // If the proxy is bound (see Proxier.listenIP) to 0.0.0.0 ("any
  1051. // interface") we want to do the same as from-container traffic and use
  1052. // REDIRECT. Except that it doesn't work (empirically). REDIRECT on
  1053. // local packets sends the traffic to localhost (special case, but it is
  1054. // documented) but the response comes from the eth0 IP (not sure why,
  1055. // truthfully), which makes DNS unhappy.
  1056. //
  1057. // So we have to use DNAT. DNAT to 127.0.0.1 can't work for the same
  1058. // reason.
  1059. //
  1060. // So we do our best to find an interface that is not a loopback and
  1061. // DNAT to that. This works (again, empirically).
  1062. //
  1063. // If the proxy is bound to a specific IP, then we have to use DNAT to
  1064. // that IP. Unlike the previous case, this works because the proxy is
  1065. // ONLY listening on that IP, not the bridge.
  1066. //
  1067. // If the proxy is bound to localhost only, this should work, but we
  1068. // don't allow it for now.
  1069. if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
  1070. proxyIP = proxier.hostIP
  1071. }
  1072. // TODO: Can we DNAT with IPv6?
  1073. args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
  1074. return args
  1075. }
  1076. // Build a slice of iptables args for a from-host public-port rule.
  1077. // See iptablesHostPortalArgs
  1078. // TODO: Should we just reuse iptablesHostPortalArgs?
  1079. func (proxier *Proxier) iptablesHostNodePortArgs(nodePort int, protocol v1.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
  1080. args := iptablesCommonPortalArgs(nil, false, false, nodePort, protocol, service)
  1081. if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
  1082. proxyIP = proxier.hostIP
  1083. }
  1084. // TODO: Can we DNAT with IPv6?
  1085. args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
  1086. return args
  1087. }
  1088. // Build a slice of iptables args for an from-non-local public-port rule.
  1089. func (proxier *Proxier) iptablesNonLocalNodePortArgs(nodePort int, protocol v1.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
  1090. args := iptablesCommonPortalArgs(nil, false, false, proxyPort, protocol, service)
  1091. args = append(args, "-m", "state", "--state", "NEW", "-j", "ACCEPT")
  1092. return args
  1093. }
  1094. func isTooManyFDsError(err error) bool {
  1095. return strings.Contains(err.Error(), "too many open files")
  1096. }
  1097. func isClosedError(err error) bool {
  1098. // A brief discussion about handling closed error here:
  1099. // https://code.google.com/p/go/issues/detail?id=4373#c14
  1100. // TODO: maybe create a stoppable TCP listener that returns a StoppedError
  1101. return strings.HasSuffix(err.Error(), "use of closed network connection")
  1102. }