server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package streaming
  14. import (
  15. "crypto/tls"
  16. "errors"
  17. "io"
  18. "net"
  19. "net/http"
  20. "net/url"
  21. "path"
  22. "time"
  23. "google.golang.org/grpc/codes"
  24. "google.golang.org/grpc/status"
  25. restful "github.com/emicklei/go-restful"
  26. "k8s.io/apimachinery/pkg/types"
  27. remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
  28. "k8s.io/client-go/tools/remotecommand"
  29. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
  30. "k8s.io/kubernetes/pkg/kubelet/server/portforward"
  31. remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
  32. )
  33. // Server is the library interface to serve the stream requests.
  34. type Server interface {
  35. http.Handler
  36. // Get the serving URL for the requests.
  37. // Requests must not be nil. Responses may be nil iff an error is returned.
  38. GetExec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
  39. GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
  40. GetPortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
  41. // Start the server.
  42. // addr is the address to serve on (address:port) stayUp indicates whether the server should
  43. // listen until Stop() is called, or automatically stop after all expected connections are
  44. // closed. Calling Get{Exec,Attach,PortForward} increments the expected connection count.
  45. // Function does not return until the server is stopped.
  46. Start(stayUp bool) error
  47. // Stop the server, and terminate any open connections.
  48. Stop() error
  49. }
  50. // Runtime is the interface to execute the commands and provide the streams.
  51. type Runtime interface {
  52. Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
  53. Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
  54. PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error
  55. }
  56. // Config defines the options used for running the stream server.
  57. type Config struct {
  58. // The host:port address the server will listen on.
  59. Addr string
  60. // The optional base URL for constructing streaming URLs. If empty, the baseURL will be
  61. // constructed from the serve address.
  62. // Note that for port "0", the URL port will be set to actual port in use.
  63. BaseURL *url.URL
  64. // How long to leave idle connections open for.
  65. StreamIdleTimeout time.Duration
  66. // How long to wait for clients to create streams. Only used for SPDY streaming.
  67. StreamCreationTimeout time.Duration
  68. // The streaming protocols the server supports (understands and permits). See
  69. // k8s.io/kubernetes/pkg/kubelet/server/remotecommand/constants.go for available protocols.
  70. // Only used for SPDY streaming.
  71. SupportedRemoteCommandProtocols []string
  72. // The streaming protocols the server supports (understands and permits). See
  73. // k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go for available protocols.
  74. // Only used for SPDY streaming.
  75. SupportedPortForwardProtocols []string
  76. // The config for serving over TLS. If nil, TLS will not be used.
  77. TLSConfig *tls.Config
  78. }
  79. // DefaultConfig provides default values for server Config. The DefaultConfig is partial, so
  80. // some fields like Addr must still be provided.
  81. var DefaultConfig = Config{
  82. StreamIdleTimeout: 4 * time.Hour,
  83. StreamCreationTimeout: remotecommandconsts.DefaultStreamCreationTimeout,
  84. SupportedRemoteCommandProtocols: remotecommandconsts.SupportedStreamingProtocols,
  85. SupportedPortForwardProtocols: portforward.SupportedProtocols,
  86. }
  87. // NewServer creates a new Server for stream requests.
  88. // TODO(tallclair): Add auth(n/z) interface & handling.
  89. func NewServer(config Config, runtime Runtime) (Server, error) {
  90. s := &server{
  91. config: config,
  92. runtime: &criAdapter{runtime},
  93. cache: newRequestCache(),
  94. }
  95. if s.config.BaseURL == nil {
  96. s.config.BaseURL = &url.URL{
  97. Scheme: "http",
  98. Host: s.config.Addr,
  99. }
  100. if s.config.TLSConfig != nil {
  101. s.config.BaseURL.Scheme = "https"
  102. }
  103. }
  104. ws := &restful.WebService{}
  105. endpoints := []struct {
  106. path string
  107. handler restful.RouteFunction
  108. }{
  109. {"/exec/{token}", s.serveExec},
  110. {"/attach/{token}", s.serveAttach},
  111. {"/portforward/{token}", s.servePortForward},
  112. }
  113. // If serving relative to a base path, set that here.
  114. pathPrefix := path.Dir(s.config.BaseURL.Path)
  115. for _, e := range endpoints {
  116. for _, method := range []string{"GET", "POST"} {
  117. ws.Route(ws.
  118. Method(method).
  119. Path(path.Join(pathPrefix, e.path)).
  120. To(e.handler))
  121. }
  122. }
  123. handler := restful.NewContainer()
  124. handler.Add(ws)
  125. s.handler = handler
  126. s.server = &http.Server{
  127. Addr: s.config.Addr,
  128. Handler: s.handler,
  129. TLSConfig: s.config.TLSConfig,
  130. }
  131. return s, nil
  132. }
  133. type server struct {
  134. config Config
  135. runtime *criAdapter
  136. handler http.Handler
  137. cache *requestCache
  138. server *http.Server
  139. }
  140. func validateExecRequest(req *runtimeapi.ExecRequest) error {
  141. if req.ContainerId == "" {
  142. return status.Errorf(codes.InvalidArgument, "missing required container_id")
  143. }
  144. if req.Tty && req.Stderr {
  145. // If TTY is set, stderr cannot be true because multiplexing is not
  146. // supported.
  147. return status.Errorf(codes.InvalidArgument, "tty and stderr cannot both be true")
  148. }
  149. if !req.Stdin && !req.Stdout && !req.Stderr {
  150. return status.Errorf(codes.InvalidArgument, "one of stdin, stdout, or stderr must be set")
  151. }
  152. return nil
  153. }
  154. func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
  155. if err := validateExecRequest(req); err != nil {
  156. return nil, err
  157. }
  158. token, err := s.cache.Insert(req)
  159. if err != nil {
  160. return nil, err
  161. }
  162. return &runtimeapi.ExecResponse{
  163. Url: s.buildURL("exec", token),
  164. }, nil
  165. }
  166. func validateAttachRequest(req *runtimeapi.AttachRequest) error {
  167. if req.ContainerId == "" {
  168. return status.Errorf(codes.InvalidArgument, "missing required container_id")
  169. }
  170. if req.Tty && req.Stderr {
  171. // If TTY is set, stderr cannot be true because multiplexing is not
  172. // supported.
  173. return status.Errorf(codes.InvalidArgument, "tty and stderr cannot both be true")
  174. }
  175. if !req.Stdin && !req.Stdout && !req.Stderr {
  176. return status.Errorf(codes.InvalidArgument, "one of stdin, stdout, and stderr must be set")
  177. }
  178. return nil
  179. }
  180. func (s *server) GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
  181. if err := validateAttachRequest(req); err != nil {
  182. return nil, err
  183. }
  184. token, err := s.cache.Insert(req)
  185. if err != nil {
  186. return nil, err
  187. }
  188. return &runtimeapi.AttachResponse{
  189. Url: s.buildURL("attach", token),
  190. }, nil
  191. }
  192. func (s *server) GetPortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
  193. if req.PodSandboxId == "" {
  194. return nil, status.Errorf(codes.InvalidArgument, "missing required pod_sandbox_id")
  195. }
  196. token, err := s.cache.Insert(req)
  197. if err != nil {
  198. return nil, err
  199. }
  200. return &runtimeapi.PortForwardResponse{
  201. Url: s.buildURL("portforward", token),
  202. }, nil
  203. }
  204. func (s *server) Start(stayUp bool) error {
  205. if !stayUp {
  206. // TODO(tallclair): Implement this.
  207. return errors.New("stayUp=false is not yet implemented")
  208. }
  209. listener, err := net.Listen("tcp", s.config.Addr)
  210. if err != nil {
  211. return err
  212. }
  213. // Use the actual address as baseURL host. This handles the "0" port case.
  214. s.config.BaseURL.Host = listener.Addr().String()
  215. if s.config.TLSConfig != nil {
  216. return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig.
  217. }
  218. return s.server.Serve(listener)
  219. }
  220. func (s *server) Stop() error {
  221. return s.server.Close()
  222. }
  223. func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  224. s.handler.ServeHTTP(w, r)
  225. }
  226. func (s *server) buildURL(method, token string) string {
  227. return s.config.BaseURL.ResolveReference(&url.URL{
  228. Path: path.Join(method, token),
  229. }).String()
  230. }
  231. func (s *server) serveExec(req *restful.Request, resp *restful.Response) {
  232. token := req.PathParameter("token")
  233. cachedRequest, ok := s.cache.Consume(token)
  234. if !ok {
  235. http.NotFound(resp.ResponseWriter, req.Request)
  236. return
  237. }
  238. exec, ok := cachedRequest.(*runtimeapi.ExecRequest)
  239. if !ok {
  240. http.NotFound(resp.ResponseWriter, req.Request)
  241. return
  242. }
  243. streamOpts := &remotecommandserver.Options{
  244. Stdin: exec.Stdin,
  245. Stdout: exec.Stdout,
  246. Stderr: exec.Stderr,
  247. TTY: exec.Tty,
  248. }
  249. remotecommandserver.ServeExec(
  250. resp.ResponseWriter,
  251. req.Request,
  252. s.runtime,
  253. "", // unused: podName
  254. "", // unusued: podUID
  255. exec.ContainerId,
  256. exec.Cmd,
  257. streamOpts,
  258. s.config.StreamIdleTimeout,
  259. s.config.StreamCreationTimeout,
  260. s.config.SupportedRemoteCommandProtocols)
  261. }
  262. func (s *server) serveAttach(req *restful.Request, resp *restful.Response) {
  263. token := req.PathParameter("token")
  264. cachedRequest, ok := s.cache.Consume(token)
  265. if !ok {
  266. http.NotFound(resp.ResponseWriter, req.Request)
  267. return
  268. }
  269. attach, ok := cachedRequest.(*runtimeapi.AttachRequest)
  270. if !ok {
  271. http.NotFound(resp.ResponseWriter, req.Request)
  272. return
  273. }
  274. streamOpts := &remotecommandserver.Options{
  275. Stdin: attach.Stdin,
  276. Stdout: attach.Stdout,
  277. Stderr: attach.Stderr,
  278. TTY: attach.Tty,
  279. }
  280. remotecommandserver.ServeAttach(
  281. resp.ResponseWriter,
  282. req.Request,
  283. s.runtime,
  284. "", // unused: podName
  285. "", // unusued: podUID
  286. attach.ContainerId,
  287. streamOpts,
  288. s.config.StreamIdleTimeout,
  289. s.config.StreamCreationTimeout,
  290. s.config.SupportedRemoteCommandProtocols)
  291. }
  292. func (s *server) servePortForward(req *restful.Request, resp *restful.Response) {
  293. token := req.PathParameter("token")
  294. cachedRequest, ok := s.cache.Consume(token)
  295. if !ok {
  296. http.NotFound(resp.ResponseWriter, req.Request)
  297. return
  298. }
  299. pf, ok := cachedRequest.(*runtimeapi.PortForwardRequest)
  300. if !ok {
  301. http.NotFound(resp.ResponseWriter, req.Request)
  302. return
  303. }
  304. portForwardOptions, err := portforward.BuildV4Options(pf.Port)
  305. if err != nil {
  306. resp.WriteError(http.StatusBadRequest, err)
  307. return
  308. }
  309. portforward.ServePortForward(
  310. resp.ResponseWriter,
  311. req.Request,
  312. s.runtime,
  313. pf.PodSandboxId,
  314. "", // unused: podUID
  315. portForwardOptions,
  316. s.config.StreamIdleTimeout,
  317. s.config.StreamCreationTimeout,
  318. s.config.SupportedPortForwardProtocols)
  319. }
  320. // criAdapter wraps the Runtime functions to conform to the remotecommand interfaces.
  321. // The adapter binds the container ID to the container name argument, and the pod sandbox ID to the pod name.
  322. type criAdapter struct {
  323. Runtime
  324. }
  325. var _ remotecommandserver.Executor = &criAdapter{}
  326. var _ remotecommandserver.Attacher = &criAdapter{}
  327. var _ portforward.PortForwarder = &criAdapter{}
  328. func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
  329. return a.Runtime.Exec(container, cmd, in, out, err, tty, resize)
  330. }
  331. func (a *criAdapter) AttachContainer(podName string, podUID types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
  332. return a.Runtime.Attach(container, in, out, err, tty, resize)
  333. }
  334. func (a *criAdapter) PortForward(podName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
  335. return a.Runtime.PortForward(podName, port, stream)
  336. }