websocket.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package portforward
  14. import (
  15. "encoding/binary"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. "k8s.io/klog"
  24. "k8s.io/apimachinery/pkg/types"
  25. "k8s.io/apimachinery/pkg/util/runtime"
  26. "k8s.io/apiserver/pkg/server/httplog"
  27. "k8s.io/apiserver/pkg/util/wsstream"
  28. api "k8s.io/kubernetes/pkg/apis/core"
  29. )
  30. const (
  31. dataChannel = iota
  32. errorChannel
  33. v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol
  34. v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
  35. )
  36. // V4Options contains details about which streams are required for port
  37. // forwarding.
  38. // All fields included in V4Options need to be expressed explicitly in the
  39. // CRI (k8s.io/cri-api/pkg/apis/{version}/api.proto) PortForwardRequest.
  40. type V4Options struct {
  41. Ports []int32
  42. }
  43. // NewV4Options creates a new options from the Request.
  44. func NewV4Options(req *http.Request) (*V4Options, error) {
  45. if !wsstream.IsWebSocketRequest(req) {
  46. return &V4Options{}, nil
  47. }
  48. portStrings := req.URL.Query()[api.PortHeader]
  49. if len(portStrings) == 0 {
  50. return nil, fmt.Errorf("query parameter %q is required", api.PortHeader)
  51. }
  52. ports := make([]int32, 0, len(portStrings))
  53. for _, portString := range portStrings {
  54. if len(portString) == 0 {
  55. return nil, fmt.Errorf("query parameter %q cannot be empty", api.PortHeader)
  56. }
  57. for _, p := range strings.Split(portString, ",") {
  58. port, err := strconv.ParseUint(p, 10, 16)
  59. if err != nil {
  60. return nil, fmt.Errorf("unable to parse %q as a port: %v", portString, err)
  61. }
  62. if port < 1 {
  63. return nil, fmt.Errorf("port %q must be > 0", portString)
  64. }
  65. ports = append(ports, int32(port))
  66. }
  67. }
  68. return &V4Options{
  69. Ports: ports,
  70. }, nil
  71. }
  72. // BuildV4Options returns a V4Options based on the given information.
  73. func BuildV4Options(ports []int32) (*V4Options, error) {
  74. return &V4Options{Ports: ports}, nil
  75. }
  76. // handleWebSocketStreams handles requests to forward ports to a pod via
  77. // a PortForwarder. A pair of streams are created per port (DATA n,
  78. // ERROR n+1). The associated port is written to each stream as a unsigned 16
  79. // bit integer in little endian format.
  80. func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, opts *V4Options, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
  81. channels := make([]wsstream.ChannelType, 0, len(opts.Ports)*2)
  82. for i := 0; i < len(opts.Ports); i++ {
  83. channels = append(channels, wsstream.ReadWriteChannel, wsstream.WriteChannel)
  84. }
  85. conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
  86. "": {
  87. Binary: true,
  88. Channels: channels,
  89. },
  90. v4BinaryWebsocketProtocol: {
  91. Binary: true,
  92. Channels: channels,
  93. },
  94. v4Base64WebsocketProtocol: {
  95. Binary: false,
  96. Channels: channels,
  97. },
  98. })
  99. conn.SetIdleTimeout(idleTimeout)
  100. _, streams, err := conn.Open(httplog.Unlogged(w), req)
  101. if err != nil {
  102. err = fmt.Errorf("Unable to upgrade websocket connection: %v", err)
  103. return err
  104. }
  105. defer conn.Close()
  106. streamPairs := make([]*websocketStreamPair, len(opts.Ports))
  107. for i := range streamPairs {
  108. streamPair := websocketStreamPair{
  109. port: opts.Ports[i],
  110. dataStream: streams[i*2+dataChannel],
  111. errorStream: streams[i*2+errorChannel],
  112. }
  113. streamPairs[i] = &streamPair
  114. portBytes := make([]byte, 2)
  115. // port is always positive so conversion is allowable
  116. binary.LittleEndian.PutUint16(portBytes, uint16(streamPair.port))
  117. streamPair.dataStream.Write(portBytes)
  118. streamPair.errorStream.Write(portBytes)
  119. }
  120. h := &websocketStreamHandler{
  121. conn: conn,
  122. streamPairs: streamPairs,
  123. pod: podName,
  124. uid: uid,
  125. forwarder: portForwarder,
  126. }
  127. h.run()
  128. return nil
  129. }
  130. // websocketStreamPair represents the error and data streams for a port
  131. // forwarding request.
  132. type websocketStreamPair struct {
  133. port int32
  134. dataStream io.ReadWriteCloser
  135. errorStream io.WriteCloser
  136. }
  137. // websocketStreamHandler is capable of processing a single port forward
  138. // request over a websocket connection
  139. type websocketStreamHandler struct {
  140. conn *wsstream.Conn
  141. streamPairs []*websocketStreamPair
  142. pod string
  143. uid types.UID
  144. forwarder PortForwarder
  145. }
  146. // run invokes the websocketStreamHandler's forwarder.PortForward
  147. // function for the given stream pair.
  148. func (h *websocketStreamHandler) run() {
  149. wg := sync.WaitGroup{}
  150. wg.Add(len(h.streamPairs))
  151. for _, pair := range h.streamPairs {
  152. p := pair
  153. go func() {
  154. defer wg.Done()
  155. h.portForward(p)
  156. }()
  157. }
  158. wg.Wait()
  159. }
  160. func (h *websocketStreamHandler) portForward(p *websocketStreamPair) {
  161. defer p.dataStream.Close()
  162. defer p.errorStream.Close()
  163. klog.V(5).Infof("(conn=%p) invoking forwarder.PortForward for port %d", h.conn, p.port)
  164. err := h.forwarder.PortForward(h.pod, h.uid, p.port, p.dataStream)
  165. klog.V(5).Infof("(conn=%p) done invoking forwarder.PortForward for port %d", h.conn, p.port)
  166. if err != nil {
  167. msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", p.port, h.pod, h.uid, err)
  168. runtime.HandleError(msg)
  169. fmt.Fprint(p.errorStream, msg.Error())
  170. }
  171. }