httpstream.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package remotecommand
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "time"
  21. apierrors "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/util/httpstream"
  24. "k8s.io/apimachinery/pkg/util/httpstream/spdy"
  25. remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
  26. "k8s.io/apimachinery/pkg/util/runtime"
  27. "k8s.io/apiserver/pkg/util/wsstream"
  28. "k8s.io/client-go/tools/remotecommand"
  29. api "k8s.io/kubernetes/pkg/apis/core"
  30. "k8s.io/klog"
  31. )
  32. // Options contains details about which streams are required for
  33. // remote command execution.
  34. type Options struct {
  35. Stdin bool
  36. Stdout bool
  37. Stderr bool
  38. TTY bool
  39. }
  40. // NewOptions creates a new Options from the Request.
  41. func NewOptions(req *http.Request) (*Options, error) {
  42. tty := req.FormValue(api.ExecTTYParam) == "1"
  43. stdin := req.FormValue(api.ExecStdinParam) == "1"
  44. stdout := req.FormValue(api.ExecStdoutParam) == "1"
  45. stderr := req.FormValue(api.ExecStderrParam) == "1"
  46. if tty && stderr {
  47. // TODO: make this an error before we reach this method
  48. klog.V(4).Infof("Access to exec with tty and stderr is not supported, bypassing stderr")
  49. stderr = false
  50. }
  51. if !stdin && !stdout && !stderr {
  52. return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr")
  53. }
  54. return &Options{
  55. Stdin: stdin,
  56. Stdout: stdout,
  57. Stderr: stderr,
  58. TTY: tty,
  59. }, nil
  60. }
  61. // context contains the connection and streams used when
  62. // forwarding an attach or execute session into a container.
  63. type context struct {
  64. conn io.Closer
  65. stdinStream io.ReadCloser
  66. stdoutStream io.WriteCloser
  67. stderrStream io.WriteCloser
  68. writeStatus func(status *apierrors.StatusError) error
  69. resizeStream io.ReadCloser
  70. resizeChan chan remotecommand.TerminalSize
  71. tty bool
  72. }
  73. // streamAndReply holds both a Stream and a channel that is closed when the stream's reply frame is
  74. // enqueued. Consumers can wait for replySent to be closed prior to proceeding, to ensure that the
  75. // replyFrame is enqueued before the connection's goaway frame is sent (e.g. if a stream was
  76. // received and right after, the connection gets closed).
  77. type streamAndReply struct {
  78. httpstream.Stream
  79. replySent <-chan struct{}
  80. }
  81. // waitStreamReply waits until either replySent or stop is closed. If replySent is closed, it sends
  82. // an empty struct to the notify channel.
  83. func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-chan struct{}) {
  84. select {
  85. case <-replySent:
  86. notify <- struct{}{}
  87. case <-stop:
  88. }
  89. }
  90. func createStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
  91. var ctx *context
  92. var ok bool
  93. if wsstream.IsWebSocketRequest(req) {
  94. ctx, ok = createWebSocketStreams(req, w, opts, idleTimeout)
  95. } else {
  96. ctx, ok = createHTTPStreamStreams(req, w, opts, supportedStreamProtocols, idleTimeout, streamCreationTimeout)
  97. }
  98. if !ok {
  99. return nil, false
  100. }
  101. if ctx.resizeStream != nil {
  102. ctx.resizeChan = make(chan remotecommand.TerminalSize)
  103. go handleResizeEvents(ctx.resizeStream, ctx.resizeChan)
  104. }
  105. return ctx, true
  106. }
  107. func createHTTPStreamStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
  108. protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
  109. if err != nil {
  110. http.Error(w, err.Error(), http.StatusBadRequest)
  111. return nil, false
  112. }
  113. streamCh := make(chan streamAndReply)
  114. upgrader := spdy.NewResponseUpgrader()
  115. conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
  116. streamCh <- streamAndReply{Stream: stream, replySent: replySent}
  117. return nil
  118. })
  119. // from this point on, we can no longer call methods on response
  120. if conn == nil {
  121. // The upgrader is responsible for notifying the client of any errors that
  122. // occurred during upgrading. All we can do is return here at this point
  123. // if we weren't successful in upgrading.
  124. return nil, false
  125. }
  126. conn.SetIdleTimeout(idleTimeout)
  127. var handler protocolHandler
  128. switch protocol {
  129. case remotecommandconsts.StreamProtocolV4Name:
  130. handler = &v4ProtocolHandler{}
  131. case remotecommandconsts.StreamProtocolV3Name:
  132. handler = &v3ProtocolHandler{}
  133. case remotecommandconsts.StreamProtocolV2Name:
  134. handler = &v2ProtocolHandler{}
  135. case "":
  136. klog.V(4).Infof("Client did not request protocol negotiation. Falling back to %q", remotecommandconsts.StreamProtocolV1Name)
  137. fallthrough
  138. case remotecommandconsts.StreamProtocolV1Name:
  139. handler = &v1ProtocolHandler{}
  140. }
  141. // count the streams client asked for, starting with 1
  142. expectedStreams := 1
  143. if opts.Stdin {
  144. expectedStreams++
  145. }
  146. if opts.Stdout {
  147. expectedStreams++
  148. }
  149. if opts.Stderr {
  150. expectedStreams++
  151. }
  152. if opts.TTY && handler.supportsTerminalResizing() {
  153. expectedStreams++
  154. }
  155. expired := time.NewTimer(streamCreationTimeout)
  156. defer expired.Stop()
  157. ctx, err := handler.waitForStreams(streamCh, expectedStreams, expired.C)
  158. if err != nil {
  159. runtime.HandleError(err)
  160. return nil, false
  161. }
  162. ctx.conn = conn
  163. ctx.tty = opts.TTY
  164. return ctx, true
  165. }
  166. type protocolHandler interface {
  167. // waitForStreams waits for the expected streams or a timeout, returning a
  168. // remoteCommandContext if all the streams were received, or an error if not.
  169. waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error)
  170. // supportsTerminalResizing returns true if the protocol handler supports terminal resizing
  171. supportsTerminalResizing() bool
  172. }
  173. // v4ProtocolHandler implements the V4 protocol version for streaming command execution. It only differs
  174. // in from v3 in the error stream format using an json-marshaled metav1.Status which carries
  175. // the process' exit code.
  176. type v4ProtocolHandler struct{}
  177. func (*v4ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
  178. ctx := &context{}
  179. receivedStreams := 0
  180. replyChan := make(chan struct{})
  181. stop := make(chan struct{})
  182. defer close(stop)
  183. WaitForStreams:
  184. for {
  185. select {
  186. case stream := <-streams:
  187. streamType := stream.Headers().Get(api.StreamType)
  188. switch streamType {
  189. case api.StreamTypeError:
  190. ctx.writeStatus = v4WriteStatusFunc(stream) // write json errors
  191. go waitStreamReply(stream.replySent, replyChan, stop)
  192. case api.StreamTypeStdin:
  193. ctx.stdinStream = stream
  194. go waitStreamReply(stream.replySent, replyChan, stop)
  195. case api.StreamTypeStdout:
  196. ctx.stdoutStream = stream
  197. go waitStreamReply(stream.replySent, replyChan, stop)
  198. case api.StreamTypeStderr:
  199. ctx.stderrStream = stream
  200. go waitStreamReply(stream.replySent, replyChan, stop)
  201. case api.StreamTypeResize:
  202. ctx.resizeStream = stream
  203. go waitStreamReply(stream.replySent, replyChan, stop)
  204. default:
  205. runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
  206. }
  207. case <-replyChan:
  208. receivedStreams++
  209. if receivedStreams == expectedStreams {
  210. break WaitForStreams
  211. }
  212. case <-expired:
  213. // TODO find a way to return the error to the user. Maybe use a separate
  214. // stream to report errors?
  215. return nil, errors.New("timed out waiting for client to create streams")
  216. }
  217. }
  218. return ctx, nil
  219. }
  220. // supportsTerminalResizing returns true because v4ProtocolHandler supports it
  221. func (*v4ProtocolHandler) supportsTerminalResizing() bool { return true }
  222. // v3ProtocolHandler implements the V3 protocol version for streaming command execution.
  223. type v3ProtocolHandler struct{}
  224. func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
  225. ctx := &context{}
  226. receivedStreams := 0
  227. replyChan := make(chan struct{})
  228. stop := make(chan struct{})
  229. defer close(stop)
  230. WaitForStreams:
  231. for {
  232. select {
  233. case stream := <-streams:
  234. streamType := stream.Headers().Get(api.StreamType)
  235. switch streamType {
  236. case api.StreamTypeError:
  237. ctx.writeStatus = v1WriteStatusFunc(stream)
  238. go waitStreamReply(stream.replySent, replyChan, stop)
  239. case api.StreamTypeStdin:
  240. ctx.stdinStream = stream
  241. go waitStreamReply(stream.replySent, replyChan, stop)
  242. case api.StreamTypeStdout:
  243. ctx.stdoutStream = stream
  244. go waitStreamReply(stream.replySent, replyChan, stop)
  245. case api.StreamTypeStderr:
  246. ctx.stderrStream = stream
  247. go waitStreamReply(stream.replySent, replyChan, stop)
  248. case api.StreamTypeResize:
  249. ctx.resizeStream = stream
  250. go waitStreamReply(stream.replySent, replyChan, stop)
  251. default:
  252. runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
  253. }
  254. case <-replyChan:
  255. receivedStreams++
  256. if receivedStreams == expectedStreams {
  257. break WaitForStreams
  258. }
  259. case <-expired:
  260. // TODO find a way to return the error to the user. Maybe use a separate
  261. // stream to report errors?
  262. return nil, errors.New("timed out waiting for client to create streams")
  263. }
  264. }
  265. return ctx, nil
  266. }
  267. // supportsTerminalResizing returns true because v3ProtocolHandler supports it
  268. func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true }
  269. // v2ProtocolHandler implements the V2 protocol version for streaming command execution.
  270. type v2ProtocolHandler struct{}
  271. func (*v2ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
  272. ctx := &context{}
  273. receivedStreams := 0
  274. replyChan := make(chan struct{})
  275. stop := make(chan struct{})
  276. defer close(stop)
  277. WaitForStreams:
  278. for {
  279. select {
  280. case stream := <-streams:
  281. streamType := stream.Headers().Get(api.StreamType)
  282. switch streamType {
  283. case api.StreamTypeError:
  284. ctx.writeStatus = v1WriteStatusFunc(stream)
  285. go waitStreamReply(stream.replySent, replyChan, stop)
  286. case api.StreamTypeStdin:
  287. ctx.stdinStream = stream
  288. go waitStreamReply(stream.replySent, replyChan, stop)
  289. case api.StreamTypeStdout:
  290. ctx.stdoutStream = stream
  291. go waitStreamReply(stream.replySent, replyChan, stop)
  292. case api.StreamTypeStderr:
  293. ctx.stderrStream = stream
  294. go waitStreamReply(stream.replySent, replyChan, stop)
  295. default:
  296. runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
  297. }
  298. case <-replyChan:
  299. receivedStreams++
  300. if receivedStreams == expectedStreams {
  301. break WaitForStreams
  302. }
  303. case <-expired:
  304. // TODO find a way to return the error to the user. Maybe use a separate
  305. // stream to report errors?
  306. return nil, errors.New("timed out waiting for client to create streams")
  307. }
  308. }
  309. return ctx, nil
  310. }
  311. // supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it.
  312. func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false }
  313. // v1ProtocolHandler implements the V1 protocol version for streaming command execution.
  314. type v1ProtocolHandler struct{}
  315. func (*v1ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
  316. ctx := &context{}
  317. receivedStreams := 0
  318. replyChan := make(chan struct{})
  319. stop := make(chan struct{})
  320. defer close(stop)
  321. WaitForStreams:
  322. for {
  323. select {
  324. case stream := <-streams:
  325. streamType := stream.Headers().Get(api.StreamType)
  326. switch streamType {
  327. case api.StreamTypeError:
  328. ctx.writeStatus = v1WriteStatusFunc(stream)
  329. // This defer statement shouldn't be here, but due to previous refactoring, it ended up in
  330. // here. This is what 1.0.x kubelets do, so we're retaining that behavior. This is fixed in
  331. // the v2ProtocolHandler.
  332. defer stream.Reset()
  333. go waitStreamReply(stream.replySent, replyChan, stop)
  334. case api.StreamTypeStdin:
  335. ctx.stdinStream = stream
  336. go waitStreamReply(stream.replySent, replyChan, stop)
  337. case api.StreamTypeStdout:
  338. ctx.stdoutStream = stream
  339. go waitStreamReply(stream.replySent, replyChan, stop)
  340. case api.StreamTypeStderr:
  341. ctx.stderrStream = stream
  342. go waitStreamReply(stream.replySent, replyChan, stop)
  343. default:
  344. runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
  345. }
  346. case <-replyChan:
  347. receivedStreams++
  348. if receivedStreams == expectedStreams {
  349. break WaitForStreams
  350. }
  351. case <-expired:
  352. // TODO find a way to return the error to the user. Maybe use a separate
  353. // stream to report errors?
  354. return nil, errors.New("timed out waiting for client to create streams")
  355. }
  356. }
  357. if ctx.stdinStream != nil {
  358. ctx.stdinStream.Close()
  359. }
  360. return ctx, nil
  361. }
  362. // supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it.
  363. func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false }
  364. func handleResizeEvents(stream io.Reader, channel chan<- remotecommand.TerminalSize) {
  365. defer runtime.HandleCrash()
  366. decoder := json.NewDecoder(stream)
  367. for {
  368. size := remotecommand.TerminalSize{}
  369. if err := decoder.Decode(&size); err != nil {
  370. break
  371. }
  372. channel <- size
  373. }
  374. }
  375. func v1WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) error {
  376. return func(status *apierrors.StatusError) error {
  377. if status.Status().Status == metav1.StatusSuccess {
  378. return nil // send error messages
  379. }
  380. _, err := stream.Write([]byte(status.Error()))
  381. return err
  382. }
  383. }
  384. // v4WriteStatusFunc returns a WriteStatusFunc that marshals a given api Status
  385. // as json in the error channel.
  386. func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) error {
  387. return func(status *apierrors.StatusError) error {
  388. bs, err := json.Marshal(status.Status())
  389. if err != nil {
  390. return err
  391. }
  392. _, err = stream.Write(bs)
  393. return err
  394. }
  395. }