proxysocket.go 18 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package winuserspace
  14. import (
  15. "fmt"
  16. "io"
  17. "net"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "github.com/miekg/dns"
  24. "k8s.io/api/core/v1"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/runtime"
  27. "k8s.io/klog"
  28. "k8s.io/kubernetes/pkg/proxy"
  29. "k8s.io/kubernetes/pkg/util/ipconfig"
  30. "k8s.io/utils/exec"
  31. )
  32. const (
  33. // Kubernetes DNS suffix search list
  34. // TODO: Get DNS suffix search list from docker containers.
  35. // --dns-search option doesn't work on Windows containers and has been
  36. // fixed recently in docker.
  37. // Kubernetes cluster domain
  38. clusterDomain = "cluster.local"
  39. // Kubernetes service domain
  40. serviceDomain = "svc." + clusterDomain
  41. // Kubernetes default namespace domain
  42. namespaceServiceDomain = "default." + serviceDomain
  43. // Kubernetes DNS service port name
  44. dnsPortName = "dns"
  45. // DNS TYPE value A (a host address)
  46. dnsTypeA uint16 = 0x01
  47. // DNS TYPE value AAAA (a host IPv6 address)
  48. dnsTypeAAAA uint16 = 0x1c
  49. // DNS CLASS value IN (the Internet)
  50. dnsClassInternet uint16 = 0x01
  51. )
  52. // Abstraction over TCP/UDP sockets which are proxied.
  53. type proxySocket interface {
  54. // Addr gets the net.Addr for a proxySocket.
  55. Addr() net.Addr
  56. // Close stops the proxySocket from accepting incoming connections.
  57. // Each implementation should comment on the impact of calling Close
  58. // while sessions are active.
  59. Close() error
  60. // ProxyLoop proxies incoming connections for the specified service to the service endpoints.
  61. ProxyLoop(service ServicePortPortalName, info *serviceInfo, proxier *Proxier)
  62. // ListenPort returns the host port that the proxySocket is listening on
  63. ListenPort() int
  64. }
  65. func newProxySocket(protocol v1.Protocol, ip net.IP, port int) (proxySocket, error) {
  66. host := ""
  67. if ip != nil {
  68. host = ip.String()
  69. }
  70. switch strings.ToUpper(string(protocol)) {
  71. case "TCP":
  72. listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
  73. if err != nil {
  74. return nil, err
  75. }
  76. return &tcpProxySocket{Listener: listener, port: port}, nil
  77. case "UDP":
  78. addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port)))
  79. if err != nil {
  80. return nil, err
  81. }
  82. conn, err := net.ListenUDP("udp", addr)
  83. if err != nil {
  84. return nil, err
  85. }
  86. return &udpProxySocket{UDPConn: conn, port: port}, nil
  87. case "SCTP":
  88. return nil, fmt.Errorf("SCTP is not supported for user space proxy")
  89. }
  90. return nil, fmt.Errorf("unknown protocol %q", protocol)
  91. }
  92. // How long we wait for a connection to a backend in seconds
  93. var endpointDialTimeout = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second}
  94. // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
  95. // no new connections are allowed but existing connections are left untouched.
  96. type tcpProxySocket struct {
  97. net.Listener
  98. port int
  99. }
  100. func (tcp *tcpProxySocket) ListenPort() int {
  101. return tcp.port
  102. }
  103. func tryConnect(service ServicePortPortalName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
  104. sessionAffinityReset := false
  105. for _, dialTimeout := range endpointDialTimeout {
  106. servicePortName := proxy.ServicePortName{
  107. NamespacedName: types.NamespacedName{
  108. Namespace: service.Namespace,
  109. Name: service.Name,
  110. },
  111. Port: service.Port,
  112. }
  113. endpoint, err := proxier.loadBalancer.NextEndpoint(servicePortName, srcAddr, sessionAffinityReset)
  114. if err != nil {
  115. klog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
  116. return nil, err
  117. }
  118. klog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint)
  119. // TODO: This could spin up a new goroutine to make the outbound connection,
  120. // and keep accepting inbound traffic.
  121. outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout)
  122. if err != nil {
  123. if isTooManyFDsError(err) {
  124. panic("Dial failed: " + err.Error())
  125. }
  126. klog.Errorf("Dial failed: %v", err)
  127. sessionAffinityReset = true
  128. continue
  129. }
  130. return outConn, nil
  131. }
  132. return nil, fmt.Errorf("failed to connect to an endpoint.")
  133. }
  134. func (tcp *tcpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) {
  135. for {
  136. if !myInfo.isAlive() {
  137. // The service port was closed or replaced.
  138. return
  139. }
  140. // Block until a connection is made.
  141. inConn, err := tcp.Accept()
  142. if err != nil {
  143. if isTooManyFDsError(err) {
  144. panic("Accept failed: " + err.Error())
  145. }
  146. if isClosedError(err) {
  147. return
  148. }
  149. if !myInfo.isAlive() {
  150. // Then the service port was just closed so the accept failure is to be expected.
  151. return
  152. }
  153. klog.Errorf("Accept failed: %v", err)
  154. continue
  155. }
  156. klog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
  157. outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier)
  158. if err != nil {
  159. klog.Errorf("Failed to connect to balancer: %v", err)
  160. inConn.Close()
  161. continue
  162. }
  163. // Spin up an async copy loop.
  164. go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
  165. }
  166. }
  167. // proxyTCP proxies data bi-directionally between in and out.
  168. func proxyTCP(in, out *net.TCPConn) {
  169. var wg sync.WaitGroup
  170. wg.Add(2)
  171. klog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
  172. in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
  173. go copyBytes("from backend", in, out, &wg)
  174. go copyBytes("to backend", out, in, &wg)
  175. wg.Wait()
  176. }
  177. func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) {
  178. defer wg.Done()
  179. klog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr())
  180. n, err := io.Copy(dest, src)
  181. if err != nil {
  182. if !isClosedError(err) {
  183. klog.Errorf("I/O error: %v", err)
  184. }
  185. }
  186. klog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr())
  187. dest.Close()
  188. src.Close()
  189. }
  190. // udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called,
  191. // no new connections are allowed and existing connections are broken.
  192. // TODO: We could lame-duck this ourselves, if it becomes important.
  193. type udpProxySocket struct {
  194. *net.UDPConn
  195. port int
  196. }
  197. func (udp *udpProxySocket) ListenPort() int {
  198. return udp.port
  199. }
  200. func (udp *udpProxySocket) Addr() net.Addr {
  201. return udp.LocalAddr()
  202. }
  203. // Holds all the known UDP clients that have not timed out.
  204. type clientCache struct {
  205. mu sync.Mutex
  206. clients map[string]net.Conn // addr string -> connection
  207. }
  208. func newClientCache() *clientCache {
  209. return &clientCache{clients: map[string]net.Conn{}}
  210. }
  211. // DNS query client classified by address and QTYPE
  212. type dnsClientQuery struct {
  213. clientAddress string
  214. dnsQType uint16
  215. }
  216. // Holds DNS client query, the value contains the index in DNS suffix search list,
  217. // the original DNS message and length for the same client and QTYPE
  218. type dnsClientCache struct {
  219. mu sync.Mutex
  220. clients map[dnsClientQuery]*dnsQueryState
  221. }
  222. type dnsQueryState struct {
  223. searchIndex int32
  224. msg *dns.Msg
  225. }
  226. func newDNSClientCache() *dnsClientCache {
  227. return &dnsClientCache{clients: map[dnsClientQuery]*dnsQueryState{}}
  228. }
  229. func packetRequiresDNSSuffix(dnsType, dnsClass uint16) bool {
  230. return (dnsType == dnsTypeA || dnsType == dnsTypeAAAA) && dnsClass == dnsClassInternet
  231. }
  232. func isDNSService(portName string) bool {
  233. return portName == dnsPortName
  234. }
  235. func appendDNSSuffix(msg *dns.Msg, buffer []byte, length int, dnsSuffix string) (int, error) {
  236. if msg == nil || len(msg.Question) == 0 {
  237. return length, fmt.Errorf("DNS message parameter is invalid")
  238. }
  239. // Save the original name since it will be reused for next iteration
  240. origName := msg.Question[0].Name
  241. if dnsSuffix != "" {
  242. msg.Question[0].Name += dnsSuffix + "."
  243. }
  244. mbuf, err := msg.PackBuffer(buffer)
  245. msg.Question[0].Name = origName
  246. if err != nil {
  247. klog.Warningf("Unable to pack DNS packet. Error is: %v", err)
  248. return length, err
  249. }
  250. if &buffer[0] != &mbuf[0] {
  251. return length, fmt.Errorf("Buffer is too small in packing DNS packet")
  252. }
  253. return len(mbuf), nil
  254. }
  255. func recoverDNSQuestion(origName string, msg *dns.Msg, buffer []byte, length int) (int, error) {
  256. if msg == nil || len(msg.Question) == 0 {
  257. return length, fmt.Errorf("DNS message parameter is invalid")
  258. }
  259. if origName == msg.Question[0].Name {
  260. return length, nil
  261. }
  262. msg.Question[0].Name = origName
  263. if len(msg.Answer) > 0 {
  264. msg.Answer[0].Header().Name = origName
  265. }
  266. mbuf, err := msg.PackBuffer(buffer)
  267. if err != nil {
  268. klog.Warningf("Unable to pack DNS packet. Error is: %v", err)
  269. return length, err
  270. }
  271. if &buffer[0] != &mbuf[0] {
  272. return length, fmt.Errorf("Buffer is too small in packing DNS packet")
  273. }
  274. return len(mbuf), nil
  275. }
  276. func processUnpackedDNSQueryPacket(
  277. dnsClients *dnsClientCache,
  278. msg *dns.Msg,
  279. host string,
  280. dnsQType uint16,
  281. buffer []byte,
  282. length int,
  283. dnsSearch []string) int {
  284. if dnsSearch == nil || len(dnsSearch) == 0 {
  285. klog.V(1).Infof("DNS search list is not initialized and is empty.")
  286. return length
  287. }
  288. // TODO: handle concurrent queries from a client
  289. dnsClients.mu.Lock()
  290. state, found := dnsClients.clients[dnsClientQuery{host, dnsQType}]
  291. if !found {
  292. state = &dnsQueryState{0, msg}
  293. dnsClients.clients[dnsClientQuery{host, dnsQType}] = state
  294. }
  295. dnsClients.mu.Unlock()
  296. index := atomic.SwapInt32(&state.searchIndex, state.searchIndex+1)
  297. // Also update message ID if the client retries due to previous query time out
  298. state.msg.MsgHdr.Id = msg.MsgHdr.Id
  299. if index < 0 || index >= int32(len(dnsSearch)) {
  300. klog.V(1).Infof("Search index %d is out of range.", index)
  301. return length
  302. }
  303. length, err := appendDNSSuffix(msg, buffer, length, dnsSearch[index])
  304. if err != nil {
  305. klog.Errorf("Append DNS suffix failed: %v", err)
  306. }
  307. return length
  308. }
  309. func processUnpackedDNSResponsePacket(
  310. svrConn net.Conn,
  311. dnsClients *dnsClientCache,
  312. msg *dns.Msg,
  313. rcode int,
  314. host string,
  315. dnsQType uint16,
  316. buffer []byte,
  317. length int,
  318. dnsSearch []string) (bool, int) {
  319. var drop bool
  320. var err error
  321. if dnsSearch == nil || len(dnsSearch) == 0 {
  322. klog.V(1).Infof("DNS search list is not initialized and is empty.")
  323. return drop, length
  324. }
  325. dnsClients.mu.Lock()
  326. state, found := dnsClients.clients[dnsClientQuery{host, dnsQType}]
  327. dnsClients.mu.Unlock()
  328. if found {
  329. index := atomic.SwapInt32(&state.searchIndex, state.searchIndex+1)
  330. if rcode != 0 && index >= 0 && index < int32(len(dnsSearch)) {
  331. // If the response has failure and iteration through the search list has not
  332. // reached the end, retry on behalf of the client using the original query message
  333. drop = true
  334. length, err = appendDNSSuffix(state.msg, buffer, length, dnsSearch[index])
  335. if err != nil {
  336. klog.Errorf("Append DNS suffix failed: %v", err)
  337. }
  338. _, err = svrConn.Write(buffer[0:length])
  339. if err != nil {
  340. if !logTimeout(err) {
  341. klog.Errorf("Write failed: %v", err)
  342. }
  343. }
  344. } else {
  345. length, err = recoverDNSQuestion(state.msg.Question[0].Name, msg, buffer, length)
  346. if err != nil {
  347. klog.Errorf("Recover DNS question failed: %v", err)
  348. }
  349. dnsClients.mu.Lock()
  350. delete(dnsClients.clients, dnsClientQuery{host, dnsQType})
  351. dnsClients.mu.Unlock()
  352. }
  353. }
  354. return drop, length
  355. }
  356. func processDNSQueryPacket(
  357. dnsClients *dnsClientCache,
  358. cliAddr net.Addr,
  359. buffer []byte,
  360. length int,
  361. dnsSearch []string) (int, error) {
  362. msg := &dns.Msg{}
  363. if err := msg.Unpack(buffer[:length]); err != nil {
  364. klog.Warningf("Unable to unpack DNS packet. Error is: %v", err)
  365. return length, err
  366. }
  367. // Query - Response bit that specifies whether this message is a query (0) or a response (1).
  368. if msg.MsgHdr.Response == true {
  369. return length, fmt.Errorf("DNS packet should be a query message")
  370. }
  371. // QDCOUNT
  372. if len(msg.Question) != 1 {
  373. klog.V(1).Infof("Number of entries in the question section of the DNS packet is: %d", len(msg.Question))
  374. klog.V(1).Infof("DNS suffix appending does not support more than one question.")
  375. return length, nil
  376. }
  377. // ANCOUNT, NSCOUNT, ARCOUNT
  378. if len(msg.Answer) != 0 || len(msg.Ns) != 0 || len(msg.Extra) != 0 {
  379. klog.V(1).Infof("DNS packet contains more than question section.")
  380. return length, nil
  381. }
  382. dnsQType := msg.Question[0].Qtype
  383. dnsQClass := msg.Question[0].Qclass
  384. if packetRequiresDNSSuffix(dnsQType, dnsQClass) {
  385. host, _, err := net.SplitHostPort(cliAddr.String())
  386. if err != nil {
  387. klog.V(1).Infof("Failed to get host from client address: %v", err)
  388. host = cliAddr.String()
  389. }
  390. length = processUnpackedDNSQueryPacket(dnsClients, msg, host, dnsQType, buffer, length, dnsSearch)
  391. }
  392. return length, nil
  393. }
  394. func processDNSResponsePacket(
  395. svrConn net.Conn,
  396. dnsClients *dnsClientCache,
  397. cliAddr net.Addr,
  398. buffer []byte,
  399. length int,
  400. dnsSearch []string) (bool, int, error) {
  401. var drop bool
  402. msg := &dns.Msg{}
  403. if err := msg.Unpack(buffer[:length]); err != nil {
  404. klog.Warningf("Unable to unpack DNS packet. Error is: %v", err)
  405. return drop, length, err
  406. }
  407. // Query - Response bit that specifies whether this message is a query (0) or a response (1).
  408. if msg.MsgHdr.Response == false {
  409. return drop, length, fmt.Errorf("DNS packet should be a response message")
  410. }
  411. // QDCOUNT
  412. if len(msg.Question) != 1 {
  413. klog.V(1).Infof("Number of entries in the response section of the DNS packet is: %d", len(msg.Answer))
  414. return drop, length, nil
  415. }
  416. dnsQType := msg.Question[0].Qtype
  417. dnsQClass := msg.Question[0].Qclass
  418. if packetRequiresDNSSuffix(dnsQType, dnsQClass) {
  419. host, _, err := net.SplitHostPort(cliAddr.String())
  420. if err != nil {
  421. klog.V(1).Infof("Failed to get host from client address: %v", err)
  422. host = cliAddr.String()
  423. }
  424. drop, length = processUnpackedDNSResponsePacket(svrConn, dnsClients, msg, msg.MsgHdr.Rcode, host, dnsQType, buffer, length, dnsSearch)
  425. }
  426. return drop, length, nil
  427. }
  428. func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) {
  429. var buffer [4096]byte // 4KiB should be enough for most whole-packets
  430. var dnsSearch []string
  431. if isDNSService(service.Port) {
  432. dnsSearch = []string{"", namespaceServiceDomain, serviceDomain, clusterDomain}
  433. execer := exec.New()
  434. ipconfigInterface := ipconfig.New(execer)
  435. suffixList, err := ipconfigInterface.GetDNSSuffixSearchList()
  436. if err == nil {
  437. dnsSearch = append(dnsSearch, suffixList...)
  438. }
  439. }
  440. for {
  441. if !myInfo.isAlive() {
  442. // The service port was closed or replaced.
  443. break
  444. }
  445. // Block until data arrives.
  446. // TODO: Accumulate a histogram of n or something, to fine tune the buffer size.
  447. n, cliAddr, err := udp.ReadFrom(buffer[0:])
  448. if err != nil {
  449. if e, ok := err.(net.Error); ok {
  450. if e.Temporary() {
  451. klog.V(1).Infof("ReadFrom had a temporary failure: %v", err)
  452. continue
  453. }
  454. }
  455. klog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err)
  456. break
  457. }
  458. // If this is DNS query packet
  459. if isDNSService(service.Port) {
  460. n, err = processDNSQueryPacket(myInfo.dnsClients, cliAddr, buffer[:], n, dnsSearch)
  461. if err != nil {
  462. klog.Errorf("Process DNS query packet failed: %v", err)
  463. }
  464. }
  465. // If this is a client we know already, reuse the connection and goroutine.
  466. svrConn, err := udp.getBackendConn(myInfo.activeClients, myInfo.dnsClients, cliAddr, proxier, service, myInfo.timeout, dnsSearch)
  467. if err != nil {
  468. continue
  469. }
  470. // TODO: It would be nice to let the goroutine handle this write, but we don't
  471. // really want to copy the buffer. We could do a pool of buffers or something.
  472. _, err = svrConn.Write(buffer[0:n])
  473. if err != nil {
  474. if !logTimeout(err) {
  475. klog.Errorf("Write failed: %v", err)
  476. // TODO: Maybe tear down the goroutine for this client/server pair?
  477. }
  478. continue
  479. }
  480. err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout))
  481. if err != nil {
  482. klog.Errorf("SetDeadline failed: %v", err)
  483. continue
  484. }
  485. }
  486. }
  487. func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, dnsClients *dnsClientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) (net.Conn, error) {
  488. activeClients.mu.Lock()
  489. defer activeClients.mu.Unlock()
  490. svrConn, found := activeClients.clients[cliAddr.String()]
  491. if !found {
  492. // TODO: This could spin up a new goroutine to make the outbound connection,
  493. // and keep accepting inbound traffic.
  494. klog.V(3).Infof("New UDP connection from %s", cliAddr)
  495. var err error
  496. svrConn, err = tryConnect(service, cliAddr, "udp", proxier)
  497. if err != nil {
  498. return nil, err
  499. }
  500. if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil {
  501. klog.Errorf("SetDeadline failed: %v", err)
  502. return nil, err
  503. }
  504. activeClients.clients[cliAddr.String()] = svrConn
  505. go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, dnsClients *dnsClientCache, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) {
  506. defer runtime.HandleCrash()
  507. udp.proxyClient(cliAddr, svrConn, activeClients, dnsClients, service, timeout, dnsSearch)
  508. }(cliAddr, svrConn, activeClients, dnsClients, service, timeout, dnsSearch)
  509. }
  510. return svrConn, nil
  511. }
  512. // This function is expected to be called as a goroutine.
  513. // TODO: Track and log bytes copied, like TCP
  514. func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, dnsClients *dnsClientCache, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) {
  515. defer svrConn.Close()
  516. var buffer [4096]byte
  517. for {
  518. n, err := svrConn.Read(buffer[0:])
  519. if err != nil {
  520. if !logTimeout(err) {
  521. klog.Errorf("Read failed: %v", err)
  522. }
  523. break
  524. }
  525. drop := false
  526. if isDNSService(service.Port) {
  527. drop, n, err = processDNSResponsePacket(svrConn, dnsClients, cliAddr, buffer[:], n, dnsSearch)
  528. if err != nil {
  529. klog.Errorf("Process DNS response packet failed: %v", err)
  530. }
  531. }
  532. if !drop {
  533. err = svrConn.SetDeadline(time.Now().Add(timeout))
  534. if err != nil {
  535. klog.Errorf("SetDeadline failed: %v", err)
  536. break
  537. }
  538. n, err = udp.WriteTo(buffer[0:n], cliAddr)
  539. if err != nil {
  540. if !logTimeout(err) {
  541. klog.Errorf("WriteTo failed: %v", err)
  542. }
  543. break
  544. }
  545. }
  546. }
  547. activeClients.mu.Lock()
  548. delete(activeClients.clients, cliAddr.String())
  549. activeClients.mu.Unlock()
  550. }