proxier.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package winuserspace
  14. import (
  15. "fmt"
  16. "net"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "k8s.io/klog"
  23. v1 "k8s.io/api/core/v1"
  24. "k8s.io/apimachinery/pkg/types"
  25. utilnet "k8s.io/apimachinery/pkg/util/net"
  26. "k8s.io/apimachinery/pkg/util/runtime"
  27. "k8s.io/kubernetes/pkg/apis/core/v1/helper"
  28. "k8s.io/kubernetes/pkg/proxy"
  29. "k8s.io/kubernetes/pkg/proxy/config"
  30. "k8s.io/kubernetes/pkg/util/netsh"
  31. )
  32. const allAvailableInterfaces string = ""
  33. type portal struct {
  34. ip string
  35. port int
  36. isExternal bool
  37. }
  38. type serviceInfo struct {
  39. isAliveAtomic int32 // Only access this with atomic ops
  40. portal portal
  41. protocol v1.Protocol
  42. socket proxySocket
  43. timeout time.Duration
  44. activeClients *clientCache
  45. dnsClients *dnsClientCache
  46. sessionAffinityType v1.ServiceAffinity
  47. }
  48. func (info *serviceInfo) setAlive(b bool) {
  49. var i int32
  50. if b {
  51. i = 1
  52. }
  53. atomic.StoreInt32(&info.isAliveAtomic, i)
  54. }
  55. func (info *serviceInfo) isAlive() bool {
  56. return atomic.LoadInt32(&info.isAliveAtomic) != 0
  57. }
  58. func logTimeout(err error) bool {
  59. if e, ok := err.(net.Error); ok {
  60. if e.Timeout() {
  61. klog.V(3).Infof("connection to endpoint closed due to inactivity")
  62. return true
  63. }
  64. }
  65. return false
  66. }
  67. // Proxier is a simple proxy for TCP connections between a localhost:lport
  68. // and services that provide the actual implementations.
  69. type Proxier struct {
  70. // EndpointSlice support has not been added for this proxier yet.
  71. config.NoopEndpointSliceHandler
  72. // TODO(imroc): implement node handler for winuserspace proxier.
  73. config.NoopNodeHandler
  74. loadBalancer LoadBalancer
  75. mu sync.Mutex // protects serviceMap
  76. serviceMap map[ServicePortPortalName]*serviceInfo
  77. syncPeriod time.Duration
  78. udpIdleTimeout time.Duration
  79. numProxyLoops int32 // use atomic ops to access this; mostly for testing
  80. netsh netsh.Interface
  81. hostIP net.IP
  82. }
  83. // assert Proxier is a proxy.Provider
  84. var _ proxy.Provider = &Proxier{}
  85. var (
  86. // ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on
  87. // the loopback address. May be checked for by callers of NewProxier to know whether
  88. // the caller provided invalid input.
  89. ErrProxyOnLocalhost = fmt.Errorf("cannot proxy on localhost")
  90. )
  91. // Used below.
  92. var localhostIPv4 = net.ParseIP("127.0.0.1")
  93. var localhostIPv6 = net.ParseIP("::1")
  94. // NewProxier returns a new Proxier given a LoadBalancer and an address on
  95. // which to listen. It is assumed that there is only a single Proxier active
  96. // on a machine. An error will be returned if the proxier cannot be started
  97. // due to an invalid ListenIP (loopback)
  98. func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, pr utilnet.PortRange, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
  99. if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
  100. return nil, ErrProxyOnLocalhost
  101. }
  102. hostIP, err := utilnet.ChooseHostInterface()
  103. if err != nil {
  104. return nil, fmt.Errorf("failed to select a host interface: %v", err)
  105. }
  106. klog.V(2).Infof("Setting proxy IP to %v", hostIP)
  107. return createProxier(loadBalancer, listenIP, netsh, hostIP, syncPeriod, udpIdleTimeout)
  108. }
  109. func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, hostIP net.IP, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
  110. return &Proxier{
  111. loadBalancer: loadBalancer,
  112. serviceMap: make(map[ServicePortPortalName]*serviceInfo),
  113. syncPeriod: syncPeriod,
  114. udpIdleTimeout: udpIdleTimeout,
  115. netsh: netsh,
  116. hostIP: hostIP,
  117. }, nil
  118. }
  119. // Sync is called to immediately synchronize the proxier state
  120. func (proxier *Proxier) Sync() {
  121. proxier.cleanupStaleStickySessions()
  122. }
  123. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
  124. func (proxier *Proxier) SyncLoop() {
  125. t := time.NewTicker(proxier.syncPeriod)
  126. defer t.Stop()
  127. for {
  128. <-t.C
  129. klog.V(6).Infof("Periodic sync")
  130. proxier.Sync()
  131. }
  132. }
  133. // cleanupStaleStickySessions cleans up any stale sticky session records in the hash map.
  134. func (proxier *Proxier) cleanupStaleStickySessions() {
  135. proxier.mu.Lock()
  136. defer proxier.mu.Unlock()
  137. servicePortNameMap := make(map[proxy.ServicePortName]bool)
  138. for name := range proxier.serviceMap {
  139. servicePortName := proxy.ServicePortName{
  140. NamespacedName: types.NamespacedName{
  141. Namespace: name.Namespace,
  142. Name: name.Name,
  143. },
  144. Port: name.Port,
  145. }
  146. if !servicePortNameMap[servicePortName] {
  147. // ensure cleanup sticky sessions only gets called once per serviceportname
  148. servicePortNameMap[servicePortName] = true
  149. proxier.loadBalancer.CleanupStaleStickySessions(servicePortName)
  150. }
  151. }
  152. }
  153. // This assumes proxier.mu is not locked.
  154. func (proxier *Proxier) stopProxy(service ServicePortPortalName, info *serviceInfo) error {
  155. proxier.mu.Lock()
  156. defer proxier.mu.Unlock()
  157. return proxier.stopProxyInternal(service, info)
  158. }
  159. // This assumes proxier.mu is locked.
  160. func (proxier *Proxier) stopProxyInternal(service ServicePortPortalName, info *serviceInfo) error {
  161. delete(proxier.serviceMap, service)
  162. info.setAlive(false)
  163. err := info.socket.Close()
  164. return err
  165. }
  166. func (proxier *Proxier) getServiceInfo(service ServicePortPortalName) (*serviceInfo, bool) {
  167. proxier.mu.Lock()
  168. defer proxier.mu.Unlock()
  169. info, ok := proxier.serviceMap[service]
  170. return info, ok
  171. }
  172. func (proxier *Proxier) setServiceInfo(service ServicePortPortalName, info *serviceInfo) {
  173. proxier.mu.Lock()
  174. defer proxier.mu.Unlock()
  175. proxier.serviceMap[service] = info
  176. }
  177. // addServicePortPortal starts listening for a new service, returning the serviceInfo.
  178. // The timeout only applies to UDP connections, for now.
  179. func (proxier *Proxier) addServicePortPortal(servicePortPortalName ServicePortPortalName, protocol v1.Protocol, listenIP string, port int, timeout time.Duration) (*serviceInfo, error) {
  180. var serviceIP net.IP
  181. if listenIP != allAvailableInterfaces {
  182. if serviceIP = net.ParseIP(listenIP); serviceIP == nil {
  183. return nil, fmt.Errorf("could not parse ip '%q'", listenIP)
  184. }
  185. // add the IP address. Node port binds to all interfaces.
  186. args := proxier.netshIpv4AddressAddArgs(serviceIP)
  187. if existed, err := proxier.netsh.EnsureIPAddress(args, serviceIP); err != nil {
  188. return nil, err
  189. } else if !existed {
  190. klog.V(3).Infof("Added ip address to fowarder interface for service %q at %s/%s", servicePortPortalName, net.JoinHostPort(listenIP, strconv.Itoa(port)), protocol)
  191. }
  192. }
  193. // add the listener, proxy
  194. sock, err := newProxySocket(protocol, serviceIP, port)
  195. if err != nil {
  196. return nil, err
  197. }
  198. si := &serviceInfo{
  199. isAliveAtomic: 1,
  200. portal: portal{
  201. ip: listenIP,
  202. port: port,
  203. isExternal: false,
  204. },
  205. protocol: protocol,
  206. socket: sock,
  207. timeout: timeout,
  208. activeClients: newClientCache(),
  209. dnsClients: newDNSClientCache(),
  210. sessionAffinityType: v1.ServiceAffinityNone, // default
  211. }
  212. proxier.setServiceInfo(servicePortPortalName, si)
  213. klog.V(2).Infof("Proxying for service %q at %s/%s", servicePortPortalName, net.JoinHostPort(listenIP, strconv.Itoa(port)), protocol)
  214. go func(service ServicePortPortalName, proxier *Proxier) {
  215. defer runtime.HandleCrash()
  216. atomic.AddInt32(&proxier.numProxyLoops, 1)
  217. sock.ProxyLoop(service, si, proxier)
  218. atomic.AddInt32(&proxier.numProxyLoops, -1)
  219. }(servicePortPortalName, proxier)
  220. return si, nil
  221. }
  222. func (proxier *Proxier) closeServicePortPortal(servicePortPortalName ServicePortPortalName, info *serviceInfo) error {
  223. // turn off the proxy
  224. if err := proxier.stopProxy(servicePortPortalName, info); err != nil {
  225. return err
  226. }
  227. // close the PortalProxy by deleting the service IP address
  228. if info.portal.ip != allAvailableInterfaces {
  229. serviceIP := net.ParseIP(info.portal.ip)
  230. args := proxier.netshIpv4AddressDeleteArgs(serviceIP)
  231. if err := proxier.netsh.DeleteIPAddress(args); err != nil {
  232. return err
  233. }
  234. }
  235. return nil
  236. }
  237. // getListenIPPortMap returns a slice of all listen IPs for a service.
  238. func getListenIPPortMap(service *v1.Service, listenPort int, nodePort int) map[string]int {
  239. listenIPPortMap := make(map[string]int)
  240. listenIPPortMap[service.Spec.ClusterIP] = listenPort
  241. for _, ip := range service.Spec.ExternalIPs {
  242. listenIPPortMap[ip] = listenPort
  243. }
  244. for _, ingress := range service.Status.LoadBalancer.Ingress {
  245. listenIPPortMap[ingress.IP] = listenPort
  246. }
  247. if nodePort != 0 {
  248. listenIPPortMap[allAvailableInterfaces] = nodePort
  249. }
  250. return listenIPPortMap
  251. }
  252. func (proxier *Proxier) mergeService(service *v1.Service) map[ServicePortPortalName]bool {
  253. if service == nil {
  254. return nil
  255. }
  256. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  257. if !helper.IsServiceIPSet(service) {
  258. klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
  259. return nil
  260. }
  261. existingPortPortals := make(map[ServicePortPortalName]bool)
  262. for i := range service.Spec.Ports {
  263. servicePort := &service.Spec.Ports[i]
  264. // create a slice of all the source IPs to use for service port portals
  265. listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort))
  266. protocol := servicePort.Protocol
  267. for listenIP, listenPort := range listenIPPortMap {
  268. servicePortPortalName := ServicePortPortalName{
  269. NamespacedName: svcName,
  270. Port: servicePort.Name,
  271. PortalIPName: listenIP,
  272. }
  273. existingPortPortals[servicePortPortalName] = true
  274. info, exists := proxier.getServiceInfo(servicePortPortalName)
  275. if exists && sameConfig(info, service, protocol, listenPort) {
  276. // Nothing changed.
  277. continue
  278. }
  279. if exists {
  280. klog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName)
  281. if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
  282. klog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
  283. }
  284. }
  285. klog.V(1).Infof("Adding new service %q at %s/%s", servicePortPortalName, net.JoinHostPort(listenIP, strconv.Itoa(listenPort)), protocol)
  286. info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout)
  287. if err != nil {
  288. klog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err)
  289. continue
  290. }
  291. info.sessionAffinityType = service.Spec.SessionAffinity
  292. klog.V(10).Infof("info: %#v", info)
  293. }
  294. if len(listenIPPortMap) > 0 {
  295. // only one loadbalancer per service port portal
  296. servicePortName := proxy.ServicePortName{
  297. NamespacedName: types.NamespacedName{
  298. Namespace: service.Namespace,
  299. Name: service.Name,
  300. },
  301. Port: servicePort.Name,
  302. }
  303. timeoutSeconds := 0
  304. if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
  305. timeoutSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
  306. }
  307. proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, timeoutSeconds)
  308. }
  309. }
  310. return existingPortPortals
  311. }
  312. func (proxier *Proxier) unmergeService(service *v1.Service, existingPortPortals map[ServicePortPortalName]bool) {
  313. if service == nil {
  314. return
  315. }
  316. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  317. if !helper.IsServiceIPSet(service) {
  318. klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
  319. return
  320. }
  321. servicePortNameMap := make(map[proxy.ServicePortName]bool)
  322. for name := range existingPortPortals {
  323. servicePortName := proxy.ServicePortName{
  324. NamespacedName: types.NamespacedName{
  325. Namespace: name.Namespace,
  326. Name: name.Name,
  327. },
  328. Port: name.Port,
  329. }
  330. servicePortNameMap[servicePortName] = true
  331. }
  332. for i := range service.Spec.Ports {
  333. servicePort := &service.Spec.Ports[i]
  334. serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
  335. // create a slice of all the source IPs to use for service port portals
  336. listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort))
  337. for listenIP := range listenIPPortMap {
  338. servicePortPortalName := ServicePortPortalName{
  339. NamespacedName: svcName,
  340. Port: servicePort.Name,
  341. PortalIPName: listenIP,
  342. }
  343. if existingPortPortals[servicePortPortalName] {
  344. continue
  345. }
  346. klog.V(1).Infof("Stopping service %q", servicePortPortalName)
  347. info, exists := proxier.getServiceInfo(servicePortPortalName)
  348. if !exists {
  349. klog.Errorf("Service %q is being removed but doesn't exist", servicePortPortalName)
  350. continue
  351. }
  352. if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
  353. klog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
  354. }
  355. }
  356. // Only delete load balancer if all listen ips per name/port show inactive.
  357. if !servicePortNameMap[serviceName] {
  358. proxier.loadBalancer.DeleteService(serviceName)
  359. }
  360. }
  361. }
  362. func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  363. _ = proxier.mergeService(service)
  364. }
  365. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  366. existingPortPortals := proxier.mergeService(service)
  367. proxier.unmergeService(oldService, existingPortPortals)
  368. }
  369. func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  370. proxier.unmergeService(service, map[ServicePortPortalName]bool{})
  371. }
  372. func (proxier *Proxier) OnServiceSynced() {
  373. }
  374. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
  375. proxier.loadBalancer.OnEndpointsAdd(endpoints)
  376. }
  377. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  378. proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints)
  379. }
  380. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
  381. proxier.loadBalancer.OnEndpointsDelete(endpoints)
  382. }
  383. func (proxier *Proxier) OnEndpointsSynced() {
  384. proxier.loadBalancer.OnEndpointsSynced()
  385. }
  386. func sameConfig(info *serviceInfo, service *v1.Service, protocol v1.Protocol, listenPort int) bool {
  387. return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity
  388. }
  389. func isTooManyFDsError(err error) bool {
  390. return strings.Contains(err.Error(), "too many open files")
  391. }
  392. func isClosedError(err error) bool {
  393. // A brief discussion about handling closed error here:
  394. // https://code.google.com/p/go/issues/detail?id=4373#c14
  395. // TODO: maybe create a stoppable TCP listener that returns a StoppedError
  396. return strings.HasSuffix(err.Error(), "use of closed network connection")
  397. }
  398. func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string {
  399. intName := proxier.netsh.GetInterfaceToAddIP()
  400. args := []string{
  401. "interface", "ipv4", "add", "address",
  402. "name=" + intName,
  403. "address=" + destIP.String(),
  404. }
  405. return args
  406. }
  407. func (proxier *Proxier) netshIpv4AddressDeleteArgs(destIP net.IP) []string {
  408. intName := proxier.netsh.GetInterfaceToAddIP()
  409. args := []string{
  410. "interface", "ipv4", "delete", "address",
  411. "name=" + intName,
  412. "address=" + destIP.String(),
  413. }
  414. return args
  415. }