httpstream_test.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package portforward
  14. import (
  15. "net/http"
  16. "testing"
  17. "time"
  18. "k8s.io/apimachinery/pkg/util/httpstream"
  19. api "k8s.io/kubernetes/pkg/apis/core"
  20. )
  21. func TestHTTPStreamReceived(t *testing.T) {
  22. tests := map[string]struct {
  23. port string
  24. streamType string
  25. expectedError string
  26. }{
  27. "missing port": {
  28. expectedError: `"port" header is required`,
  29. },
  30. "unable to parse port": {
  31. port: "abc",
  32. expectedError: `unable to parse "abc" as a port: strconv.ParseUint: parsing "abc": invalid syntax`,
  33. },
  34. "negative port": {
  35. port: "-1",
  36. expectedError: `unable to parse "-1" as a port: strconv.ParseUint: parsing "-1": invalid syntax`,
  37. },
  38. "missing stream type": {
  39. port: "80",
  40. expectedError: `"streamType" header is required`,
  41. },
  42. "valid port with error stream": {
  43. port: "80",
  44. streamType: "error",
  45. },
  46. "valid port with data stream": {
  47. port: "80",
  48. streamType: "data",
  49. },
  50. "invalid stream type": {
  51. port: "80",
  52. streamType: "foo",
  53. expectedError: `invalid stream type "foo"`,
  54. },
  55. }
  56. for name, test := range tests {
  57. streams := make(chan httpstream.Stream, 1)
  58. f := httpStreamReceived(streams)
  59. stream := newFakeHTTPStream()
  60. if len(test.port) > 0 {
  61. stream.headers.Set("port", test.port)
  62. }
  63. if len(test.streamType) > 0 {
  64. stream.headers.Set("streamType", test.streamType)
  65. }
  66. replySent := make(chan struct{})
  67. err := f(stream, replySent)
  68. close(replySent)
  69. if len(test.expectedError) > 0 {
  70. if err == nil {
  71. t.Errorf("%s: expected err=%q, but it was nil", name, test.expectedError)
  72. }
  73. if e, a := test.expectedError, err.Error(); e != a {
  74. t.Errorf("%s: expected err=%q, got %q", name, e, a)
  75. }
  76. continue
  77. }
  78. if err != nil {
  79. t.Errorf("%s: unexpected error %v", name, err)
  80. continue
  81. }
  82. if s := <-streams; s != stream {
  83. t.Errorf("%s: expected stream %#v, got %#v", name, stream, s)
  84. }
  85. }
  86. }
  87. func TestGetStreamPair(t *testing.T) {
  88. timeout := make(chan time.Time)
  89. h := &httpStreamHandler{
  90. streamPairs: make(map[string]*httpStreamPair),
  91. }
  92. // test adding a new entry
  93. p, created := h.getStreamPair("1")
  94. if p == nil {
  95. t.Fatalf("unexpected nil pair")
  96. }
  97. if !created {
  98. t.Fatal("expected created=true")
  99. }
  100. if p.dataStream != nil {
  101. t.Errorf("unexpected non-nil data stream")
  102. }
  103. if p.errorStream != nil {
  104. t.Errorf("unexpected non-nil error stream")
  105. }
  106. // start the monitor for this pair
  107. monitorDone := make(chan struct{})
  108. go func() {
  109. h.monitorStreamPair(p, timeout)
  110. close(monitorDone)
  111. }()
  112. if !h.hasStreamPair("1") {
  113. t.Fatal("This should still be true")
  114. }
  115. // make sure we can retrieve an existing entry
  116. p2, created := h.getStreamPair("1")
  117. if created {
  118. t.Fatal("expected created=false")
  119. }
  120. if p != p2 {
  121. t.Fatalf("retrieving an existing pair: expected %#v, got %#v", p, p2)
  122. }
  123. // removed via complete
  124. dataStream := newFakeHTTPStream()
  125. dataStream.headers.Set(api.StreamType, api.StreamTypeData)
  126. complete, err := p.add(dataStream)
  127. if err != nil {
  128. t.Fatalf("unexpected error adding data stream to pair: %v", err)
  129. }
  130. if complete {
  131. t.Fatalf("unexpected complete")
  132. }
  133. errorStream := newFakeHTTPStream()
  134. errorStream.headers.Set(api.StreamType, api.StreamTypeError)
  135. complete, err = p.add(errorStream)
  136. if err != nil {
  137. t.Fatalf("unexpected error adding error stream to pair: %v", err)
  138. }
  139. if !complete {
  140. t.Fatal("unexpected incomplete")
  141. }
  142. // make sure monitorStreamPair completed
  143. <-monitorDone
  144. // make sure the pair was removed
  145. if h.hasStreamPair("1") {
  146. t.Fatal("expected removal of pair after both data and error streams received")
  147. }
  148. // removed via timeout
  149. p, created = h.getStreamPair("2")
  150. if !created {
  151. t.Fatal("expected created=true")
  152. }
  153. if p == nil {
  154. t.Fatal("expected p not to be nil")
  155. }
  156. monitorDone = make(chan struct{})
  157. go func() {
  158. h.monitorStreamPair(p, timeout)
  159. close(monitorDone)
  160. }()
  161. // cause the timeout
  162. close(timeout)
  163. // make sure monitorStreamPair completed
  164. <-monitorDone
  165. if h.hasStreamPair("2") {
  166. t.Fatal("expected stream pair to be removed")
  167. }
  168. }
  169. func TestRequestID(t *testing.T) {
  170. h := &httpStreamHandler{}
  171. s := newFakeHTTPStream()
  172. s.headers.Set(api.StreamType, api.StreamTypeError)
  173. s.id = 1
  174. if e, a := "1", h.requestID(s); e != a {
  175. t.Errorf("expected %q, got %q", e, a)
  176. }
  177. s.headers.Set(api.StreamType, api.StreamTypeData)
  178. s.id = 3
  179. if e, a := "1", h.requestID(s); e != a {
  180. t.Errorf("expected %q, got %q", e, a)
  181. }
  182. s.id = 7
  183. s.headers.Set(api.PortForwardRequestIDHeader, "2")
  184. if e, a := "2", h.requestID(s); e != a {
  185. t.Errorf("expected %q, got %q", e, a)
  186. }
  187. }
  188. type fakeHTTPStream struct {
  189. headers http.Header
  190. id uint32
  191. }
  192. func newFakeHTTPStream() *fakeHTTPStream {
  193. return &fakeHTTPStream{
  194. headers: make(http.Header),
  195. }
  196. }
  197. var _ httpstream.Stream = &fakeHTTPStream{}
  198. func (s *fakeHTTPStream) Read(data []byte) (int, error) {
  199. return 0, nil
  200. }
  201. func (s *fakeHTTPStream) Write(data []byte) (int, error) {
  202. return 0, nil
  203. }
  204. func (s *fakeHTTPStream) Close() error {
  205. return nil
  206. }
  207. func (s *fakeHTTPStream) Reset() error {
  208. return nil
  209. }
  210. func (s *fakeHTTPStream) Headers() http.Header {
  211. return s.headers
  212. }
  213. func (s *fakeHTTPStream) Identifier() uint32 {
  214. return s.id
  215. }