portfoward_test.go 6.4 KB


  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package tests
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io"
  18. "net"
  19. "net/http"
  20. "net/http/httptest"
  21. "net/url"
  22. "os"
  23. "strings"
  24. "sync"
  25. "testing"
  26. "time"
  27. "k8s.io/apimachinery/pkg/types"
  28. restclient "k8s.io/client-go/rest"
  29. . "k8s.io/client-go/tools/portforward"
  30. "k8s.io/client-go/transport/spdy"
  31. "k8s.io/kubernetes/pkg/kubelet/server/portforward"
  32. )
  33. // fakePortForwarder simulates port forwarding for testing. It implements
  34. // portforward.PortForwarder.
  35. type fakePortForwarder struct {
  36. lock sync.Mutex
  37. // stores data expected from the stream per port
  38. expected map[int32]string
  39. // stores data received from the stream per port
  40. received map[int32]string
  41. // data to be sent to the stream per port
  42. send map[int32]string
  43. }
  44. var _ portforward.PortForwarder = &fakePortForwarder{}
  45. func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error {
  46. defer stream.Close()
  47. // read from the client
  48. received := make([]byte, len(pf.expected[port]))
  49. n, err := stream.Read(received)
  50. if err != nil {
  51. return fmt.Errorf("error reading from client for port %d: %v", port, err)
  52. }
  53. if n != len(pf.expected[port]) {
  54. return fmt.Errorf("unexpected length read from client for port %d: got %d, expected %d. data=%q", port, n, len(pf.expected[port]), string(received))
  55. }
  56. // store the received content
  57. pf.lock.Lock()
  58. pf.received[port] = string(received)
  59. pf.lock.Unlock()
  60. // send the hardcoded data to the client
  61. io.Copy(stream, strings.NewReader(pf.send[port]))
  62. return nil
  63. }
  64. // fakePortForwardServer creates an HTTP server that can handle port forwarding
  65. // requests.
  66. func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedFromClient map[int32]string) http.HandlerFunc {
  67. return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  68. pf := &fakePortForwarder{
  69. expected: expectedFromClient,
  70. received: make(map[int32]string),
  71. send: serverSends,
  72. }
  73. portforward.ServePortForward(w, req, pf, "pod", "uid", nil, 0, 10*time.Second, portforward.SupportedProtocols)
  74. for port, expected := range expectedFromClient {
  75. actual, ok := pf.received[port]
  76. if !ok {
  77. t.Errorf("%s: server didn't receive any data for port %d", testName, port)
  78. continue
  79. }
  80. if expected != actual {
  81. t.Errorf("%s: server expected to receive %q, got %q for port %d", testName, expected, actual, port)
  82. }
  83. }
  84. for port, actual := range pf.received {
  85. if _, ok := expectedFromClient[port]; !ok {
  86. t.Errorf("%s: server unexpectedly received %q for port %d", testName, actual, port)
  87. }
  88. }
  89. })
  90. }
  91. func TestForwardPorts(t *testing.T) {
  92. tests := map[string]struct {
  93. ports []string
  94. clientSends map[int32]string
  95. serverSends map[int32]string
  96. }{
  97. "forward 1 port with no data either direction": {
  98. ports: []string{"5000"},
  99. },
  100. "forward 2 ports with bidirectional data": {
  101. ports: []string{"5001", "6000"},
  102. clientSends: map[int32]string{
  103. 5001: "abcd",
  104. 6000: "ghij",
  105. },
  106. serverSends: map[int32]string{
  107. 5001: "1234",
  108. 6000: "5678",
  109. },
  110. },
  111. }
  112. for testName, test := range tests {
  113. server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends))
  114. transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{})
  115. if err != nil {
  116. t.Fatal(err)
  117. }
  118. url, _ := url.Parse(server.URL)
  119. dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
  120. stopChan := make(chan struct{}, 1)
  121. readyChan := make(chan struct{})
  122. pf, err := New(dialer, test.ports, stopChan, readyChan, os.Stdout, os.Stderr)
  123. if err != nil {
  124. t.Fatalf("%s: unexpected error calling New: %v", testName, err)
  125. }
  126. doneChan := make(chan error)
  127. go func() {
  128. doneChan <- pf.ForwardPorts()
  129. }()
  130. <-pf.Ready
  131. for port, data := range test.clientSends {
  132. clientConn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port))
  133. if err != nil {
  134. t.Errorf("%s: error dialing %d: %s", testName, port, err)
  135. server.Close()
  136. continue
  137. }
  138. defer clientConn.Close()
  139. n, err := clientConn.Write([]byte(data))
  140. if err != nil && err != io.EOF {
  141. t.Errorf("%s: Error sending data '%s': %s", testName, data, err)
  142. server.Close()
  143. continue
  144. }
  145. if n == 0 {
  146. t.Errorf("%s: unexpected write of 0 bytes", testName)
  147. server.Close()
  148. continue
  149. }
  150. b := make([]byte, 4)
  151. _, err = clientConn.Read(b)
  152. if err != nil && err != io.EOF {
  153. t.Errorf("%s: Error reading data: %s", testName, err)
  154. server.Close()
  155. continue
  156. }
  157. if !bytes.Equal([]byte(test.serverSends[port]), b) {
  158. t.Errorf("%s: expected to read '%s', got '%s'", testName, test.serverSends[port], b)
  159. server.Close()
  160. continue
  161. }
  162. }
  163. // tell r.ForwardPorts to stop
  164. close(stopChan)
  165. // wait for r.ForwardPorts to actually return
  166. err = <-doneChan
  167. if err != nil {
  168. t.Errorf("%s: unexpected error: %s", testName, err)
  169. }
  170. server.Close()
  171. }
  172. }
  173. func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
  174. server := httptest.NewServer(fakePortForwardServer(t, "allBindsFailed", nil, nil))
  175. defer server.Close()
  176. transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{})
  177. if err != nil {
  178. t.Fatal(err)
  179. }
  180. url, _ := url.Parse(server.URL)
  181. dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
  182. stopChan1 := make(chan struct{}, 1)
  183. defer close(stopChan1)
  184. readyChan1 := make(chan struct{})
  185. pf1, err := New(dialer, []string{"5555"}, stopChan1, readyChan1, os.Stdout, os.Stderr)
  186. if err != nil {
  187. t.Fatalf("error creating pf1: %v", err)
  188. }
  189. go pf1.ForwardPorts()
  190. <-pf1.Ready
  191. stopChan2 := make(chan struct{}, 1)
  192. readyChan2 := make(chan struct{})
  193. pf2, err := New(dialer, []string{"5555"}, stopChan2, readyChan2, os.Stdout, os.Stderr)
  194. if err != nil {
  195. t.Fatalf("error creating pf2: %v", err)
  196. }
  197. if err := pf2.ForwardPorts(); err == nil {
  198. t.Fatal("expected non-nil error for pf2.ForwardPorts")
  199. }
  200. }