stream.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. package spdystream
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "sync"
  9. "time"
  10. "github.com/docker/spdystream/spdy"
  11. )
  12. var (
  13. ErrUnreadPartialData = errors.New("unread partial data")
  14. )
  15. type Stream struct {
  16. streamId spdy.StreamId
  17. parent *Stream
  18. conn *Connection
  19. startChan chan error
  20. dataLock sync.RWMutex
  21. dataChan chan []byte
  22. unread []byte
  23. priority uint8
  24. headers http.Header
  25. headerChan chan http.Header
  26. finishLock sync.Mutex
  27. finished bool
  28. replyCond *sync.Cond
  29. replied bool
  30. closeLock sync.Mutex
  31. closeChan chan bool
  32. }
  33. // WriteData writes data to stream, sending a dataframe per call
  34. func (s *Stream) WriteData(data []byte, fin bool) error {
  35. s.waitWriteReply()
  36. var flags spdy.DataFlags
  37. if fin {
  38. flags = spdy.DataFlagFin
  39. s.finishLock.Lock()
  40. if s.finished {
  41. s.finishLock.Unlock()
  42. return ErrWriteClosedStream
  43. }
  44. s.finished = true
  45. s.finishLock.Unlock()
  46. }
  47. dataFrame := &spdy.DataFrame{
  48. StreamId: s.streamId,
  49. Flags: flags,
  50. Data: data,
  51. }
  52. debugMessage("(%p) (%d) Writing data frame", s, s.streamId)
  53. return s.conn.framer.WriteFrame(dataFrame)
  54. }
  55. // Write writes bytes to a stream, calling write data for each call.
  56. func (s *Stream) Write(data []byte) (n int, err error) {
  57. err = s.WriteData(data, false)
  58. if err == nil {
  59. n = len(data)
  60. }
  61. return
  62. }
  63. // Read reads bytes from a stream, a single read will never get more
  64. // than what is sent on a single data frame, but a multiple calls to
  65. // read may get data from the same data frame.
  66. func (s *Stream) Read(p []byte) (n int, err error) {
  67. if s.unread == nil {
  68. select {
  69. case <-s.closeChan:
  70. return 0, io.EOF
  71. case read, ok := <-s.dataChan:
  72. if !ok {
  73. return 0, io.EOF
  74. }
  75. s.unread = read
  76. }
  77. }
  78. n = copy(p, s.unread)
  79. if n < len(s.unread) {
  80. s.unread = s.unread[n:]
  81. } else {
  82. s.unread = nil
  83. }
  84. return
  85. }
  86. // ReadData reads an entire data frame and returns the byte array
  87. // from the data frame. If there is unread data from the result
  88. // of a Read call, this function will return an ErrUnreadPartialData.
  89. func (s *Stream) ReadData() ([]byte, error) {
  90. debugMessage("(%p) Reading data from %d", s, s.streamId)
  91. if s.unread != nil {
  92. return nil, ErrUnreadPartialData
  93. }
  94. select {
  95. case <-s.closeChan:
  96. return nil, io.EOF
  97. case read, ok := <-s.dataChan:
  98. if !ok {
  99. return nil, io.EOF
  100. }
  101. return read, nil
  102. }
  103. }
  104. func (s *Stream) waitWriteReply() {
  105. if s.replyCond != nil {
  106. s.replyCond.L.Lock()
  107. for !s.replied {
  108. s.replyCond.Wait()
  109. }
  110. s.replyCond.L.Unlock()
  111. }
  112. }
  113. // Wait waits for the stream to receive a reply.
  114. func (s *Stream) Wait() error {
  115. return s.WaitTimeout(time.Duration(0))
  116. }
  117. // WaitTimeout waits for the stream to receive a reply or for timeout.
  118. // When the timeout is reached, ErrTimeout will be returned.
  119. func (s *Stream) WaitTimeout(timeout time.Duration) error {
  120. var timeoutChan <-chan time.Time
  121. if timeout > time.Duration(0) {
  122. timeoutChan = time.After(timeout)
  123. }
  124. select {
  125. case err := <-s.startChan:
  126. if err != nil {
  127. return err
  128. }
  129. break
  130. case <-timeoutChan:
  131. return ErrTimeout
  132. }
  133. return nil
  134. }
  135. // Close closes the stream by sending an empty data frame with the
  136. // finish flag set, indicating this side is finished with the stream.
  137. func (s *Stream) Close() error {
  138. select {
  139. case <-s.closeChan:
  140. // Stream is now fully closed
  141. s.conn.removeStream(s)
  142. default:
  143. break
  144. }
  145. return s.WriteData([]byte{}, true)
  146. }
  147. // Reset sends a reset frame, putting the stream into the fully closed state.
  148. func (s *Stream) Reset() error {
  149. s.conn.removeStream(s)
  150. return s.resetStream()
  151. }
  152. func (s *Stream) resetStream() error {
  153. // Always call closeRemoteChannels, even if s.finished is already true.
  154. // This makes it so that stream.Close() followed by stream.Reset() allows
  155. // stream.Read() to unblock.
  156. s.closeRemoteChannels()
  157. s.finishLock.Lock()
  158. if s.finished {
  159. s.finishLock.Unlock()
  160. return nil
  161. }
  162. s.finished = true
  163. s.finishLock.Unlock()
  164. resetFrame := &spdy.RstStreamFrame{
  165. StreamId: s.streamId,
  166. Status: spdy.Cancel,
  167. }
  168. return s.conn.framer.WriteFrame(resetFrame)
  169. }
  170. // CreateSubStream creates a stream using the current as the parent
  171. func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) {
  172. return s.conn.CreateStream(headers, s, fin)
  173. }
  174. // SetPriority sets the stream priority, does not affect the
  175. // remote priority of this stream after Open has been called.
  176. // Valid values are 0 through 7, 0 being the highest priority
  177. // and 7 the lowest.
  178. func (s *Stream) SetPriority(priority uint8) {
  179. s.priority = priority
  180. }
  181. // SendHeader sends a header frame across the stream
  182. func (s *Stream) SendHeader(headers http.Header, fin bool) error {
  183. return s.conn.sendHeaders(headers, s, fin)
  184. }
  185. // SendReply sends a reply on a stream, only valid to be called once
  186. // when handling a new stream
  187. func (s *Stream) SendReply(headers http.Header, fin bool) error {
  188. if s.replyCond == nil {
  189. return errors.New("cannot reply on initiated stream")
  190. }
  191. s.replyCond.L.Lock()
  192. defer s.replyCond.L.Unlock()
  193. if s.replied {
  194. return nil
  195. }
  196. err := s.conn.sendReply(headers, s, fin)
  197. if err != nil {
  198. return err
  199. }
  200. s.replied = true
  201. s.replyCond.Broadcast()
  202. return nil
  203. }
  204. // Refuse sends a reset frame with the status refuse, only
  205. // valid to be called once when handling a new stream. This
  206. // may be used to indicate that a stream is not allowed
  207. // when http status codes are not being used.
  208. func (s *Stream) Refuse() error {
  209. if s.replied {
  210. return nil
  211. }
  212. s.replied = true
  213. return s.conn.sendReset(spdy.RefusedStream, s)
  214. }
  215. // Cancel sends a reset frame with the status canceled. This
  216. // can be used at any time by the creator of the Stream to
  217. // indicate the stream is no longer needed.
  218. func (s *Stream) Cancel() error {
  219. return s.conn.sendReset(spdy.Cancel, s)
  220. }
  221. // ReceiveHeader receives a header sent on the other side
  222. // of the stream. This function will block until a header
  223. // is received or stream is closed.
  224. func (s *Stream) ReceiveHeader() (http.Header, error) {
  225. select {
  226. case <-s.closeChan:
  227. break
  228. case header, ok := <-s.headerChan:
  229. if !ok {
  230. return nil, fmt.Errorf("header chan closed")
  231. }
  232. return header, nil
  233. }
  234. return nil, fmt.Errorf("stream closed")
  235. }
  236. // Parent returns the parent stream
  237. func (s *Stream) Parent() *Stream {
  238. return s.parent
  239. }
  240. // Headers returns the headers used to create the stream
  241. func (s *Stream) Headers() http.Header {
  242. return s.headers
  243. }
  244. // String returns the string version of stream using the
  245. // streamId to uniquely identify the stream
  246. func (s *Stream) String() string {
  247. return fmt.Sprintf("stream:%d", s.streamId)
  248. }
  249. // Identifier returns a 32 bit identifier for the stream
  250. func (s *Stream) Identifier() uint32 {
  251. return uint32(s.streamId)
  252. }
  253. // IsFinished returns whether the stream has finished
  254. // sending data
  255. func (s *Stream) IsFinished() bool {
  256. return s.finished
  257. }
  258. // Implement net.Conn interface
  259. func (s *Stream) LocalAddr() net.Addr {
  260. return s.conn.conn.LocalAddr()
  261. }
  262. func (s *Stream) RemoteAddr() net.Addr {
  263. return s.conn.conn.RemoteAddr()
  264. }
  265. // TODO set per stream values instead of connection-wide
  266. func (s *Stream) SetDeadline(t time.Time) error {
  267. return s.conn.conn.SetDeadline(t)
  268. }
  269. func (s *Stream) SetReadDeadline(t time.Time) error {
  270. return s.conn.conn.SetReadDeadline(t)
  271. }
  272. func (s *Stream) SetWriteDeadline(t time.Time) error {
  273. return s.conn.conn.SetWriteDeadline(t)
  274. }
  275. func (s *Stream) closeRemoteChannels() {
  276. s.closeLock.Lock()
  277. defer s.closeLock.Unlock()
  278. select {
  279. case <-s.closeChan:
  280. default:
  281. close(s.closeChan)
  282. }
  283. }