docker_streaming.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package dockershim
  14. import (
  15. "bytes"
  16. "context"
  17. "fmt"
  18. "io"
  19. "math"
  20. "time"
  21. dockertypes "github.com/docker/docker/api/types"
  22. "k8s.io/client-go/tools/remotecommand"
  23. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
  24. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  25. "k8s.io/kubernetes/pkg/kubelet/server/streaming"
  26. "k8s.io/kubernetes/pkg/kubelet/util/ioutils"
  27. utilexec "k8s.io/utils/exec"
  28. "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
  29. )
  30. type streamingRuntime struct {
  31. client libdocker.Interface
  32. execHandler ExecHandler
  33. }
  34. var _ streaming.Runtime = &streamingRuntime{}
  35. func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
  36. return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
  37. }
  38. // Internal version of Exec adds a timeout.
  39. func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
  40. container, err := checkContainerStatus(r.client, containerID)
  41. if err != nil {
  42. return err
  43. }
  44. return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout)
  45. }
  46. func (r *streamingRuntime) Attach(containerID string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
  47. _, err := checkContainerStatus(r.client, containerID)
  48. if err != nil {
  49. return err
  50. }
  51. return attachContainer(r.client, containerID, in, out, errw, tty, resize)
  52. }
  53. func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
  54. if port < 0 || port > math.MaxUint16 {
  55. return fmt.Errorf("invalid port %d", port)
  56. }
  57. return r.portForward(podSandboxID, port, stream)
  58. }
  59. // ExecSync executes a command in the container, and returns the stdout output.
  60. // If command exits with a non-zero exit code, an error is returned.
  61. func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) {
  62. timeout := time.Duration(req.Timeout) * time.Second
  63. var stdoutBuffer, stderrBuffer bytes.Buffer
  64. err := ds.streamingRuntime.exec(req.ContainerId, req.Cmd,
  65. nil, // in
  66. ioutils.WriteCloserWrapper(&stdoutBuffer),
  67. ioutils.WriteCloserWrapper(&stderrBuffer),
  68. false, // tty
  69. nil, // resize
  70. timeout)
  71. var exitCode int32
  72. if err != nil {
  73. exitError, ok := err.(utilexec.ExitError)
  74. if !ok {
  75. return nil, err
  76. }
  77. exitCode = int32(exitError.ExitStatus())
  78. }
  79. return &runtimeapi.ExecSyncResponse{
  80. Stdout: stdoutBuffer.Bytes(),
  81. Stderr: stderrBuffer.Bytes(),
  82. ExitCode: exitCode,
  83. }, nil
  84. }
  85. // Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
  86. func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
  87. if ds.streamingServer == nil {
  88. return nil, streaming.NewErrorStreamingDisabled("exec")
  89. }
  90. _, err := checkContainerStatus(ds.client, req.ContainerId)
  91. if err != nil {
  92. return nil, err
  93. }
  94. return ds.streamingServer.GetExec(req)
  95. }
  96. // Attach prepares a streaming endpoint to attach to a running container, and returns the address.
  97. func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
  98. if ds.streamingServer == nil {
  99. return nil, streaming.NewErrorStreamingDisabled("attach")
  100. }
  101. _, err := checkContainerStatus(ds.client, req.ContainerId)
  102. if err != nil {
  103. return nil, err
  104. }
  105. return ds.streamingServer.GetAttach(req)
  106. }
  107. // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
  108. func (ds *dockerService) PortForward(_ context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
  109. if ds.streamingServer == nil {
  110. return nil, streaming.NewErrorStreamingDisabled("port forward")
  111. }
  112. _, err := checkContainerStatus(ds.client, req.PodSandboxId)
  113. if err != nil {
  114. return nil, err
  115. }
  116. // TODO(tallclair): Verify that ports are exposed.
  117. return ds.streamingServer.GetPortForward(req)
  118. }
  119. func checkContainerStatus(client libdocker.Interface, containerID string) (*dockertypes.ContainerJSON, error) {
  120. container, err := client.InspectContainer(containerID)
  121. if err != nil {
  122. return nil, err
  123. }
  124. if !container.State.Running {
  125. return nil, fmt.Errorf("container not running (%s)", container.ID)
  126. }
  127. return container, nil
  128. }
  129. func attachContainer(client libdocker.Interface, containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
  130. // Have to start this before the call to client.AttachToContainer because client.AttachToContainer is a blocking
  131. // call :-( Otherwise, resize events don't get processed and the terminal never resizes.
  132. kubecontainer.HandleResizing(resize, func(size remotecommand.TerminalSize) {
  133. client.ResizeContainerTTY(containerID, uint(size.Height), uint(size.Width))
  134. })
  135. // TODO(random-liu): Do we really use the *Logs* field here?
  136. opts := dockertypes.ContainerAttachOptions{
  137. Stream: true,
  138. Stdin: stdin != nil,
  139. Stdout: stdout != nil,
  140. Stderr: stderr != nil,
  141. }
  142. sopts := libdocker.StreamOptions{
  143. InputStream: stdin,
  144. OutputStream: stdout,
  145. ErrorStream: stderr,
  146. RawTerminal: tty,
  147. }
  148. return client.AttachToContainer(containerID, opts, sopts)
  149. }