proxier.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package winuserspace
  14. import (
  15. "fmt"
  16. "net"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "k8s.io/klog"
  23. "k8s.io/api/core/v1"
  24. "k8s.io/apimachinery/pkg/types"
  25. utilnet "k8s.io/apimachinery/pkg/util/net"
  26. "k8s.io/apimachinery/pkg/util/runtime"
  27. "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  28. "k8s.io/kubernetes/pkg/proxy"
  29. "k8s.io/kubernetes/pkg/util/netsh"
  30. )
  31. const allAvailableInterfaces string = ""
  32. type portal struct {
  33. ip string
  34. port int
  35. isExternal bool
  36. }
  37. type serviceInfo struct {
  38. isAliveAtomic int32 // Only access this with atomic ops
  39. portal portal
  40. protocol v1.Protocol
  41. socket proxySocket
  42. timeout time.Duration
  43. activeClients *clientCache
  44. dnsClients *dnsClientCache
  45. sessionAffinityType v1.ServiceAffinity
  46. }
  47. func (info *serviceInfo) setAlive(b bool) {
  48. var i int32
  49. if b {
  50. i = 1
  51. }
  52. atomic.StoreInt32(&info.isAliveAtomic, i)
  53. }
  54. func (info *serviceInfo) isAlive() bool {
  55. return atomic.LoadInt32(&info.isAliveAtomic) != 0
  56. }
  57. func logTimeout(err error) bool {
  58. if e, ok := err.(net.Error); ok {
  59. if e.Timeout() {
  60. klog.V(3).Infof("connection to endpoint closed due to inactivity")
  61. return true
  62. }
  63. }
  64. return false
  65. }
  66. // Proxier is a simple proxy for TCP connections between a localhost:lport
  67. // and services that provide the actual implementations.
  68. type Proxier struct {
  69. loadBalancer LoadBalancer
  70. mu sync.Mutex // protects serviceMap
  71. serviceMap map[ServicePortPortalName]*serviceInfo
  72. syncPeriod time.Duration
  73. udpIdleTimeout time.Duration
  74. portMapMutex sync.Mutex
  75. portMap map[portMapKey]*portMapValue
  76. numProxyLoops int32 // use atomic ops to access this; mostly for testing
  77. netsh netsh.Interface
  78. hostIP net.IP
  79. }
  80. // assert Proxier is a ProxyProvider
  81. var _ proxy.ProxyProvider = &Proxier{}
  82. // A key for the portMap. The ip has to be a string because slices can't be map
  83. // keys.
  84. type portMapKey struct {
  85. ip string
  86. port int
  87. protocol v1.Protocol
  88. }
  89. func (k *portMapKey) String() string {
  90. return fmt.Sprintf("%s/%s", net.JoinHostPort(k.ip, strconv.Itoa(k.port)), k.protocol)
  91. }
  92. // A value for the portMap
  93. type portMapValue struct {
  94. owner ServicePortPortalName
  95. socket interface {
  96. Close() error
  97. }
  98. }
  99. var (
  100. // ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on
  101. // the loopback address. May be checked for by callers of NewProxier to know whether
  102. // the caller provided invalid input.
  103. ErrProxyOnLocalhost = fmt.Errorf("cannot proxy on localhost")
  104. )
  105. // Used below.
  106. var localhostIPv4 = net.ParseIP("127.0.0.1")
  107. var localhostIPv6 = net.ParseIP("::1")
  108. // NewProxier returns a new Proxier given a LoadBalancer and an address on
  109. // which to listen. It is assumed that there is only a single Proxier active
  110. // on a machine. An error will be returned if the proxier cannot be started
  111. // due to an invalid ListenIP (loopback)
  112. func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, pr utilnet.PortRange, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
  113. if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
  114. return nil, ErrProxyOnLocalhost
  115. }
  116. hostIP, err := utilnet.ChooseHostInterface()
  117. if err != nil {
  118. return nil, fmt.Errorf("failed to select a host interface: %v", err)
  119. }
  120. klog.V(2).Infof("Setting proxy IP to %v", hostIP)
  121. return createProxier(loadBalancer, listenIP, netsh, hostIP, syncPeriod, udpIdleTimeout)
  122. }
  123. func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, hostIP net.IP, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
  124. return &Proxier{
  125. loadBalancer: loadBalancer,
  126. serviceMap: make(map[ServicePortPortalName]*serviceInfo),
  127. portMap: make(map[portMapKey]*portMapValue),
  128. syncPeriod: syncPeriod,
  129. udpIdleTimeout: udpIdleTimeout,
  130. netsh: netsh,
  131. hostIP: hostIP,
  132. }, nil
  133. }
  134. // Sync is called to immediately synchronize the proxier state
  135. func (proxier *Proxier) Sync() {
  136. proxier.cleanupStaleStickySessions()
  137. }
  138. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
  139. func (proxier *Proxier) SyncLoop() {
  140. t := time.NewTicker(proxier.syncPeriod)
  141. defer t.Stop()
  142. for {
  143. <-t.C
  144. klog.V(6).Infof("Periodic sync")
  145. proxier.Sync()
  146. }
  147. }
  148. // cleanupStaleStickySessions cleans up any stale sticky session records in the hash map.
  149. func (proxier *Proxier) cleanupStaleStickySessions() {
  150. proxier.mu.Lock()
  151. defer proxier.mu.Unlock()
  152. servicePortNameMap := make(map[proxy.ServicePortName]bool)
  153. for name := range proxier.serviceMap {
  154. servicePortName := proxy.ServicePortName{
  155. NamespacedName: types.NamespacedName{
  156. Namespace: name.Namespace,
  157. Name: name.Name,
  158. },
  159. Port: name.Port,
  160. }
  161. if servicePortNameMap[servicePortName] == false {
  162. // ensure cleanup sticky sessions only gets called once per serviceportname
  163. servicePortNameMap[servicePortName] = true
  164. proxier.loadBalancer.CleanupStaleStickySessions(servicePortName)
  165. }
  166. }
  167. }
  168. // This assumes proxier.mu is not locked.
  169. func (proxier *Proxier) stopProxy(service ServicePortPortalName, info *serviceInfo) error {
  170. proxier.mu.Lock()
  171. defer proxier.mu.Unlock()
  172. return proxier.stopProxyInternal(service, info)
  173. }
  174. // This assumes proxier.mu is locked.
  175. func (proxier *Proxier) stopProxyInternal(service ServicePortPortalName, info *serviceInfo) error {
  176. delete(proxier.serviceMap, service)
  177. info.setAlive(false)
  178. err := info.socket.Close()
  179. return err
  180. }
  181. func (proxier *Proxier) getServiceInfo(service ServicePortPortalName) (*serviceInfo, bool) {
  182. proxier.mu.Lock()
  183. defer proxier.mu.Unlock()
  184. info, ok := proxier.serviceMap[service]
  185. return info, ok
  186. }
  187. func (proxier *Proxier) setServiceInfo(service ServicePortPortalName, info *serviceInfo) {
  188. proxier.mu.Lock()
  189. defer proxier.mu.Unlock()
  190. proxier.serviceMap[service] = info
  191. }
  192. // addServicePortPortal starts listening for a new service, returning the serviceInfo.
  193. // The timeout only applies to UDP connections, for now.
  194. func (proxier *Proxier) addServicePortPortal(servicePortPortalName ServicePortPortalName, protocol v1.Protocol, listenIP string, port int, timeout time.Duration) (*serviceInfo, error) {
  195. var serviceIP net.IP
  196. if listenIP != allAvailableInterfaces {
  197. if serviceIP = net.ParseIP(listenIP); serviceIP == nil {
  198. return nil, fmt.Errorf("could not parse ip '%q'", listenIP)
  199. }
  200. // add the IP address. Node port binds to all interfaces.
  201. args := proxier.netshIpv4AddressAddArgs(serviceIP)
  202. if existed, err := proxier.netsh.EnsureIPAddress(args, serviceIP); err != nil {
  203. return nil, err
  204. } else if !existed {
  205. klog.V(3).Infof("Added ip address to fowarder interface for service %q at %s/%s", servicePortPortalName, net.JoinHostPort(listenIP, strconv.Itoa(port)), protocol)
  206. }
  207. }
  208. // add the listener, proxy
  209. sock, err := newProxySocket(protocol, serviceIP, port)
  210. if err != nil {
  211. return nil, err
  212. }
  213. si := &serviceInfo{
  214. isAliveAtomic: 1,
  215. portal: portal{
  216. ip: listenIP,
  217. port: port,
  218. isExternal: false,
  219. },
  220. protocol: protocol,
  221. socket: sock,
  222. timeout: timeout,
  223. activeClients: newClientCache(),
  224. dnsClients: newDNSClientCache(),
  225. sessionAffinityType: v1.ServiceAffinityNone, // default
  226. }
  227. proxier.setServiceInfo(servicePortPortalName, si)
  228. klog.V(2).Infof("Proxying for service %q at %s/%s", servicePortPortalName, net.JoinHostPort(listenIP, strconv.Itoa(port)), protocol)
  229. go func(service ServicePortPortalName, proxier *Proxier) {
  230. defer runtime.HandleCrash()
  231. atomic.AddInt32(&proxier.numProxyLoops, 1)
  232. sock.ProxyLoop(service, si, proxier)
  233. atomic.AddInt32(&proxier.numProxyLoops, -1)
  234. }(servicePortPortalName, proxier)
  235. return si, nil
  236. }
  237. func (proxier *Proxier) closeServicePortPortal(servicePortPortalName ServicePortPortalName, info *serviceInfo) error {
  238. // turn off the proxy
  239. if err := proxier.stopProxy(servicePortPortalName, info); err != nil {
  240. return err
  241. }
  242. // close the PortalProxy by deleting the service IP address
  243. if info.portal.ip != allAvailableInterfaces {
  244. serviceIP := net.ParseIP(info.portal.ip)
  245. args := proxier.netshIpv4AddressDeleteArgs(serviceIP)
  246. if err := proxier.netsh.DeleteIPAddress(args); err != nil {
  247. return err
  248. }
  249. }
  250. return nil
  251. }
  252. // getListenIPPortMap returns a slice of all listen IPs for a service.
  253. func getListenIPPortMap(service *v1.Service, listenPort int, nodePort int) map[string]int {
  254. listenIPPortMap := make(map[string]int)
  255. listenIPPortMap[service.Spec.ClusterIP] = listenPort
  256. for _, ip := range service.Spec.ExternalIPs {
  257. listenIPPortMap[ip] = listenPort
  258. }
  259. for _, ingress := range service.Status.LoadBalancer.Ingress {
  260. listenIPPortMap[ingress.IP] = listenPort
  261. }
  262. if nodePort != 0 {
  263. listenIPPortMap[allAvailableInterfaces] = nodePort
  264. }
  265. return listenIPPortMap
  266. }
  267. func (proxier *Proxier) mergeService(service *v1.Service) map[ServicePortPortalName]bool {
  268. if service == nil {
  269. return nil
  270. }
  271. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  272. if !helper.IsServiceIPSet(service) {
  273. klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
  274. return nil
  275. }
  276. existingPortPortals := make(map[ServicePortPortalName]bool)
  277. for i := range service.Spec.Ports {
  278. servicePort := &service.Spec.Ports[i]
  279. // create a slice of all the source IPs to use for service port portals
  280. listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort))
  281. protocol := servicePort.Protocol
  282. for listenIP, listenPort := range listenIPPortMap {
  283. servicePortPortalName := ServicePortPortalName{
  284. NamespacedName: svcName,
  285. Port: servicePort.Name,
  286. PortalIPName: listenIP,
  287. }
  288. existingPortPortals[servicePortPortalName] = true
  289. info, exists := proxier.getServiceInfo(servicePortPortalName)
  290. if exists && sameConfig(info, service, protocol, listenPort) {
  291. // Nothing changed.
  292. continue
  293. }
  294. if exists {
  295. klog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName)
  296. if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
  297. klog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
  298. }
  299. }
  300. klog.V(1).Infof("Adding new service %q at %s/%s", servicePortPortalName, net.JoinHostPort(listenIP, strconv.Itoa(listenPort)), protocol)
  301. info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout)
  302. if err != nil {
  303. klog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err)
  304. continue
  305. }
  306. info.sessionAffinityType = service.Spec.SessionAffinity
  307. klog.V(10).Infof("info: %#v", info)
  308. }
  309. if len(listenIPPortMap) > 0 {
  310. // only one loadbalancer per service port portal
  311. servicePortName := proxy.ServicePortName{
  312. NamespacedName: types.NamespacedName{
  313. Namespace: service.Namespace,
  314. Name: service.Name,
  315. },
  316. Port: servicePort.Name,
  317. }
  318. timeoutSeconds := 0
  319. if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
  320. timeoutSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
  321. }
  322. proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, timeoutSeconds)
  323. }
  324. }
  325. return existingPortPortals
  326. }
  327. func (proxier *Proxier) unmergeService(service *v1.Service, existingPortPortals map[ServicePortPortalName]bool) {
  328. if service == nil {
  329. return
  330. }
  331. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  332. if !helper.IsServiceIPSet(service) {
  333. klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
  334. return
  335. }
  336. servicePortNameMap := make(map[proxy.ServicePortName]bool)
  337. for name := range existingPortPortals {
  338. servicePortName := proxy.ServicePortName{
  339. NamespacedName: types.NamespacedName{
  340. Namespace: name.Namespace,
  341. Name: name.Name,
  342. },
  343. Port: name.Port,
  344. }
  345. servicePortNameMap[servicePortName] = true
  346. }
  347. for i := range service.Spec.Ports {
  348. servicePort := &service.Spec.Ports[i]
  349. serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
  350. // create a slice of all the source IPs to use for service port portals
  351. listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort))
  352. for listenIP := range listenIPPortMap {
  353. servicePortPortalName := ServicePortPortalName{
  354. NamespacedName: svcName,
  355. Port: servicePort.Name,
  356. PortalIPName: listenIP,
  357. }
  358. if existingPortPortals[servicePortPortalName] {
  359. continue
  360. }
  361. klog.V(1).Infof("Stopping service %q", servicePortPortalName)
  362. info, exists := proxier.getServiceInfo(servicePortPortalName)
  363. if !exists {
  364. klog.Errorf("Service %q is being removed but doesn't exist", servicePortPortalName)
  365. continue
  366. }
  367. if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
  368. klog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
  369. }
  370. }
  371. // Only delete load balancer if all listen ips per name/port show inactive.
  372. if !servicePortNameMap[serviceName] {
  373. proxier.loadBalancer.DeleteService(serviceName)
  374. }
  375. }
  376. }
  377. func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  378. _ = proxier.mergeService(service)
  379. }
  380. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  381. existingPortPortals := proxier.mergeService(service)
  382. proxier.unmergeService(oldService, existingPortPortals)
  383. }
  384. func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  385. proxier.unmergeService(service, map[ServicePortPortalName]bool{})
  386. }
  387. func (proxier *Proxier) OnServiceSynced() {
  388. }
  389. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
  390. proxier.loadBalancer.OnEndpointsAdd(endpoints)
  391. }
  392. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  393. proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints)
  394. }
  395. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
  396. proxier.loadBalancer.OnEndpointsDelete(endpoints)
  397. }
  398. func (proxier *Proxier) OnEndpointsSynced() {
  399. proxier.loadBalancer.OnEndpointsSynced()
  400. }
  401. func sameConfig(info *serviceInfo, service *v1.Service, protocol v1.Protocol, listenPort int) bool {
  402. return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity
  403. }
  404. func isTooManyFDsError(err error) bool {
  405. return strings.Contains(err.Error(), "too many open files")
  406. }
  407. func isClosedError(err error) bool {
  408. // A brief discussion about handling closed error here:
  409. // https://code.google.com/p/go/issues/detail?id=4373#c14
  410. // TODO: maybe create a stoppable TCP listener that returns a StoppedError
  411. return strings.HasSuffix(err.Error(), "use of closed network connection")
  412. }
  413. func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string {
  414. intName := proxier.netsh.GetInterfaceToAddIP()
  415. args := []string{
  416. "interface", "ipv4", "add", "address",
  417. "name=" + intName,
  418. "address=" + destIP.String(),
  419. }
  420. return args
  421. }
  422. func (proxier *Proxier) netshIpv4AddressDeleteArgs(destIP net.IP) []string {
  423. intName := proxier.netsh.GetInterfaceToAddIP()
  424. args := []string{
  425. "interface", "ipv4", "delete", "address",
  426. "name=" + intName,
  427. "address=" + destIP.String(),
  428. }
  429. return args
  430. }