docker_streaming.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package dockershim
  14. import (
  15. "bytes"
  16. "context"
  17. "fmt"
  18. "io"
  19. "math"
  20. "time"
  21. dockertypes "github.com/docker/docker/api/types"
  22. "k8s.io/client-go/tools/remotecommand"
  23. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
  24. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  25. "k8s.io/kubernetes/pkg/kubelet/server/streaming"
  26. "k8s.io/kubernetes/pkg/kubelet/util/ioutils"
  27. utilexec "k8s.io/utils/exec"
  28. "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
  29. )
  30. type streamingRuntime struct {
  31. client libdocker.Interface
  32. execHandler ExecHandler
  33. }
  34. var _ streaming.Runtime = &streamingRuntime{}
  35. const maxMsgSize = 1024 * 1024 * 16
  36. func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
  37. return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
  38. }
  39. // Internal version of Exec adds a timeout.
  40. func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
  41. container, err := checkContainerStatus(r.client, containerID)
  42. if err != nil {
  43. return err
  44. }
  45. return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout)
  46. }
  47. func (r *streamingRuntime) Attach(containerID string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
  48. _, err := checkContainerStatus(r.client, containerID)
  49. if err != nil {
  50. return err
  51. }
  52. return attachContainer(r.client, containerID, in, out, errw, tty, resize)
  53. }
  54. func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
  55. if port < 0 || port > math.MaxUint16 {
  56. return fmt.Errorf("invalid port %d", port)
  57. }
  58. return r.portForward(podSandboxID, port, stream)
  59. }
  60. // ExecSync executes a command in the container, and returns the stdout output.
  61. // If command exits with a non-zero exit code, an error is returned.
  62. func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) {
  63. timeout := time.Duration(req.Timeout) * time.Second
  64. var stdoutBuffer, stderrBuffer bytes.Buffer
  65. err := ds.streamingRuntime.exec(req.ContainerId, req.Cmd,
  66. nil, // in
  67. ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stdoutBuffer, maxMsgSize)),
  68. ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stderrBuffer, maxMsgSize)),
  69. false, // tty
  70. nil, // resize
  71. timeout)
  72. var exitCode int32
  73. if err != nil {
  74. exitError, ok := err.(utilexec.ExitError)
  75. if !ok {
  76. return nil, err
  77. }
  78. exitCode = int32(exitError.ExitStatus())
  79. }
  80. return &runtimeapi.ExecSyncResponse{
  81. Stdout: stdoutBuffer.Bytes(),
  82. Stderr: stderrBuffer.Bytes(),
  83. ExitCode: exitCode,
  84. }, nil
  85. }
  86. // Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
  87. func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
  88. if ds.streamingServer == nil {
  89. return nil, streaming.NewErrorStreamingDisabled("exec")
  90. }
  91. _, err := checkContainerStatus(ds.client, req.ContainerId)
  92. if err != nil {
  93. return nil, err
  94. }
  95. return ds.streamingServer.GetExec(req)
  96. }
  97. // Attach prepares a streaming endpoint to attach to a running container, and returns the address.
  98. func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
  99. if ds.streamingServer == nil {
  100. return nil, streaming.NewErrorStreamingDisabled("attach")
  101. }
  102. _, err := checkContainerStatus(ds.client, req.ContainerId)
  103. if err != nil {
  104. return nil, err
  105. }
  106. return ds.streamingServer.GetAttach(req)
  107. }
  108. // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
  109. func (ds *dockerService) PortForward(_ context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
  110. if ds.streamingServer == nil {
  111. return nil, streaming.NewErrorStreamingDisabled("port forward")
  112. }
  113. _, err := checkContainerStatus(ds.client, req.PodSandboxId)
  114. if err != nil {
  115. return nil, err
  116. }
  117. // TODO(tallclair): Verify that ports are exposed.
  118. return ds.streamingServer.GetPortForward(req)
  119. }
  120. func checkContainerStatus(client libdocker.Interface, containerID string) (*dockertypes.ContainerJSON, error) {
  121. container, err := client.InspectContainer(containerID)
  122. if err != nil {
  123. return nil, err
  124. }
  125. if !container.State.Running {
  126. return nil, fmt.Errorf("container not running (%s)", container.ID)
  127. }
  128. return container, nil
  129. }
  130. func attachContainer(client libdocker.Interface, containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
  131. // Have to start this before the call to client.AttachToContainer because client.AttachToContainer is a blocking
  132. // call :-( Otherwise, resize events don't get processed and the terminal never resizes.
  133. kubecontainer.HandleResizing(resize, func(size remotecommand.TerminalSize) {
  134. client.ResizeContainerTTY(containerID, uint(size.Height), uint(size.Width))
  135. })
  136. // TODO(random-liu): Do we really use the *Logs* field here?
  137. opts := dockertypes.ContainerAttachOptions{
  138. Stream: true,
  139. Stdin: stdin != nil,
  140. Stdout: stdout != nil,
  141. Stderr: stderr != nil,
  142. }
  143. sopts := libdocker.StreamOptions{
  144. InputStream: stdin,
  145. OutputStream: stdout,
  146. ErrorStream: stderr,
  147. RawTerminal: tty,
  148. }
  149. return client.AttachToContainer(containerID, opts, sopts)
  150. }