client.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "context"
  16. "errors"
  17. "io"
  18. "math/rand"
  19. "net"
  20. "sync"
  21. "time"
  22. "google.golang.org/grpc"
  23. "k8s.io/klog"
  24. "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
  25. )
  26. // Tunnel provides ability to dial a connection through a tunnel.
  27. type Tunnel interface {
  28. // Dial connects to the address on the named network, similar to
  29. // what net.Dial does. The only supported protocol is tcp.
  30. Dial(protocol, address string) (net.Conn, error)
  31. }
  32. type dialResult struct {
  33. err string
  34. connid int64
  35. }
  36. // grpcTunnel implements Tunnel
  37. type grpcTunnel struct {
  38. stream client.ProxyService_ProxyClient
  39. pendingDial map[int64]chan<- dialResult
  40. conns map[int64]*conn
  41. pendingDialLock sync.RWMutex
  42. connsLock sync.RWMutex
  43. }
  44. // CreateGrpcTunnel creates a Tunnel to dial to a remote server through a
  45. // gRPC based proxy service.
  46. func CreateGrpcTunnel(address string, opts ...grpc.DialOption) (Tunnel, error) {
  47. c, err := grpc.Dial(address, opts...)
  48. if err != nil {
  49. return nil, err
  50. }
  51. grpcClient := client.NewProxyServiceClient(c)
  52. stream, err := grpcClient.Proxy(context.Background())
  53. if err != nil {
  54. return nil, err
  55. }
  56. tunnel := &grpcTunnel{
  57. stream: stream,
  58. pendingDial: make(map[int64]chan<- dialResult),
  59. conns: make(map[int64]*conn),
  60. }
  61. go tunnel.serve()
  62. return tunnel, nil
  63. }
  64. func (t *grpcTunnel) serve() {
  65. for {
  66. pkt, err := t.stream.Recv()
  67. if err == io.EOF {
  68. return
  69. }
  70. if err != nil || pkt == nil {
  71. klog.Warningf("stream read error: %v", err)
  72. return
  73. }
  74. klog.Infof("[tracing] recv packet %+v", pkt)
  75. switch pkt.Type {
  76. case client.PacketType_DIAL_RSP:
  77. resp := pkt.GetDialResponse()
  78. t.pendingDialLock.RLock()
  79. ch, ok := t.pendingDial[resp.Random]
  80. t.pendingDialLock.RUnlock()
  81. if !ok {
  82. klog.Warning("DialResp not recognized; dropped")
  83. } else {
  84. ch <- dialResult{
  85. err: resp.Error,
  86. connid: resp.ConnectID,
  87. }
  88. }
  89. case client.PacketType_DATA:
  90. resp := pkt.GetData()
  91. // TODO: flow control
  92. t.connsLock.RLock()
  93. conn, ok := t.conns[resp.ConnectID]
  94. t.connsLock.RUnlock()
  95. if ok {
  96. conn.readCh <- resp.Data
  97. } else {
  98. klog.Warningf("connection id %d not recognized", resp.ConnectID)
  99. }
  100. case client.PacketType_CLOSE_RSP:
  101. resp := pkt.GetCloseResponse()
  102. t.connsLock.RLock()
  103. conn, ok := t.conns[resp.ConnectID]
  104. t.connsLock.RUnlock()
  105. if ok {
  106. close(conn.readCh)
  107. conn.closeCh <- resp.Error
  108. close(conn.closeCh)
  109. t.connsLock.Lock()
  110. delete(t.conns, resp.ConnectID)
  111. t.connsLock.Unlock()
  112. } else {
  113. klog.Warningf("connection id %d not recognized", resp.ConnectID)
  114. }
  115. }
  116. }
  117. }
  118. // Dial connects to the address on the named network, similar to
  119. // what net.Dial does. The only supported protocol is tcp.
  120. func (t *grpcTunnel) Dial(protocol, address string) (net.Conn, error) {
  121. if protocol != "tcp" {
  122. return nil, errors.New("protocol not supported")
  123. }
  124. random := rand.Int63()
  125. resCh := make(chan dialResult)
  126. t.pendingDialLock.Lock()
  127. t.pendingDial[random] = resCh
  128. t.pendingDialLock.Unlock()
  129. defer func() {
  130. t.pendingDialLock.Lock()
  131. delete(t.pendingDial, random)
  132. t.pendingDialLock.Unlock()
  133. }()
  134. req := &client.Packet{
  135. Type: client.PacketType_DIAL_REQ,
  136. Payload: &client.Packet_DialRequest{
  137. DialRequest: &client.DialRequest{
  138. Protocol: protocol,
  139. Address: address,
  140. Random: random,
  141. },
  142. },
  143. }
  144. klog.Infof("[tracing] send packet %+v", req)
  145. err := t.stream.Send(req)
  146. if err != nil {
  147. return nil, err
  148. }
  149. klog.Info("DIAL_REQ sent to proxy server")
  150. c := &conn{stream: t.stream}
  151. select {
  152. case res := <-resCh:
  153. if res.err != "" {
  154. return nil, errors.New(res.err)
  155. }
  156. c.connID = res.connid
  157. c.readCh = make(chan []byte, 10)
  158. c.closeCh = make(chan string)
  159. t.connsLock.Lock()
  160. t.conns[res.connid] = c
  161. t.connsLock.Unlock()
  162. case <-time.After(30 * time.Second):
  163. return nil, errors.New("dial timeout")
  164. }
  165. return c, nil
  166. }