123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- package spdystream
- import (
- "errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "sync"
- "time"
- "github.com/docker/spdystream/spdy"
- )
- var (
- ErrUnreadPartialData = errors.New("unread partial data")
- )
- type Stream struct {
- streamId spdy.StreamId
- parent *Stream
- conn *Connection
- startChan chan error
- dataLock sync.RWMutex
- dataChan chan []byte
- unread []byte
- priority uint8
- headers http.Header
- headerChan chan http.Header
- finishLock sync.Mutex
- finished bool
- replyCond *sync.Cond
- replied bool
- closeLock sync.Mutex
- closeChan chan bool
- }
- // WriteData writes data to stream, sending a dataframe per call
- func (s *Stream) WriteData(data []byte, fin bool) error {
- s.waitWriteReply()
- var flags spdy.DataFlags
- if fin {
- flags = spdy.DataFlagFin
- s.finishLock.Lock()
- if s.finished {
- s.finishLock.Unlock()
- return ErrWriteClosedStream
- }
- s.finished = true
- s.finishLock.Unlock()
- }
- dataFrame := &spdy.DataFrame{
- StreamId: s.streamId,
- Flags: flags,
- Data: data,
- }
- debugMessage("(%p) (%d) Writing data frame", s, s.streamId)
- return s.conn.framer.WriteFrame(dataFrame)
- }
- // Write writes bytes to a stream, calling write data for each call.
- func (s *Stream) Write(data []byte) (n int, err error) {
- err = s.WriteData(data, false)
- if err == nil {
- n = len(data)
- }
- return
- }
- // Read reads bytes from a stream, a single read will never get more
- // than what is sent on a single data frame, but a multiple calls to
- // read may get data from the same data frame.
- func (s *Stream) Read(p []byte) (n int, err error) {
- if s.unread == nil {
- select {
- case <-s.closeChan:
- return 0, io.EOF
- case read, ok := <-s.dataChan:
- if !ok {
- return 0, io.EOF
- }
- s.unread = read
- }
- }
- n = copy(p, s.unread)
- if n < len(s.unread) {
- s.unread = s.unread[n:]
- } else {
- s.unread = nil
- }
- return
- }
- // ReadData reads an entire data frame and returns the byte array
- // from the data frame. If there is unread data from the result
- // of a Read call, this function will return an ErrUnreadPartialData.
- func (s *Stream) ReadData() ([]byte, error) {
- debugMessage("(%p) Reading data from %d", s, s.streamId)
- if s.unread != nil {
- return nil, ErrUnreadPartialData
- }
- select {
- case <-s.closeChan:
- return nil, io.EOF
- case read, ok := <-s.dataChan:
- if !ok {
- return nil, io.EOF
- }
- return read, nil
- }
- }
- func (s *Stream) waitWriteReply() {
- if s.replyCond != nil {
- s.replyCond.L.Lock()
- for !s.replied {
- s.replyCond.Wait()
- }
- s.replyCond.L.Unlock()
- }
- }
- // Wait waits for the stream to receive a reply.
- func (s *Stream) Wait() error {
- return s.WaitTimeout(time.Duration(0))
- }
- // WaitTimeout waits for the stream to receive a reply or for timeout.
- // When the timeout is reached, ErrTimeout will be returned.
- func (s *Stream) WaitTimeout(timeout time.Duration) error {
- var timeoutChan <-chan time.Time
- if timeout > time.Duration(0) {
- timeoutChan = time.After(timeout)
- }
- select {
- case err := <-s.startChan:
- if err != nil {
- return err
- }
- break
- case <-timeoutChan:
- return ErrTimeout
- }
- return nil
- }
- // Close closes the stream by sending an empty data frame with the
- // finish flag set, indicating this side is finished with the stream.
- func (s *Stream) Close() error {
- select {
- case <-s.closeChan:
- // Stream is now fully closed
- s.conn.removeStream(s)
- default:
- break
- }
- return s.WriteData([]byte{}, true)
- }
- // Reset sends a reset frame, putting the stream into the fully closed state.
- func (s *Stream) Reset() error {
- s.conn.removeStream(s)
- return s.resetStream()
- }
- func (s *Stream) resetStream() error {
- // Always call closeRemoteChannels, even if s.finished is already true.
- // This makes it so that stream.Close() followed by stream.Reset() allows
- // stream.Read() to unblock.
- s.closeRemoteChannels()
- s.finishLock.Lock()
- if s.finished {
- s.finishLock.Unlock()
- return nil
- }
- s.finished = true
- s.finishLock.Unlock()
- resetFrame := &spdy.RstStreamFrame{
- StreamId: s.streamId,
- Status: spdy.Cancel,
- }
- return s.conn.framer.WriteFrame(resetFrame)
- }
- // CreateSubStream creates a stream using the current as the parent
- func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) {
- return s.conn.CreateStream(headers, s, fin)
- }
- // SetPriority sets the stream priority, does not affect the
- // remote priority of this stream after Open has been called.
- // Valid values are 0 through 7, 0 being the highest priority
- // and 7 the lowest.
- func (s *Stream) SetPriority(priority uint8) {
- s.priority = priority
- }
- // SendHeader sends a header frame across the stream
- func (s *Stream) SendHeader(headers http.Header, fin bool) error {
- return s.conn.sendHeaders(headers, s, fin)
- }
- // SendReply sends a reply on a stream, only valid to be called once
- // when handling a new stream
- func (s *Stream) SendReply(headers http.Header, fin bool) error {
- if s.replyCond == nil {
- return errors.New("cannot reply on initiated stream")
- }
- s.replyCond.L.Lock()
- defer s.replyCond.L.Unlock()
- if s.replied {
- return nil
- }
- err := s.conn.sendReply(headers, s, fin)
- if err != nil {
- return err
- }
- s.replied = true
- s.replyCond.Broadcast()
- return nil
- }
- // Refuse sends a reset frame with the status refuse, only
- // valid to be called once when handling a new stream. This
- // may be used to indicate that a stream is not allowed
- // when http status codes are not being used.
- func (s *Stream) Refuse() error {
- if s.replied {
- return nil
- }
- s.replied = true
- return s.conn.sendReset(spdy.RefusedStream, s)
- }
- // Cancel sends a reset frame with the status canceled. This
- // can be used at any time by the creator of the Stream to
- // indicate the stream is no longer needed.
- func (s *Stream) Cancel() error {
- return s.conn.sendReset(spdy.Cancel, s)
- }
- // ReceiveHeader receives a header sent on the other side
- // of the stream. This function will block until a header
- // is received or stream is closed.
- func (s *Stream) ReceiveHeader() (http.Header, error) {
- select {
- case <-s.closeChan:
- break
- case header, ok := <-s.headerChan:
- if !ok {
- return nil, fmt.Errorf("header chan closed")
- }
- return header, nil
- }
- return nil, fmt.Errorf("stream closed")
- }
- // Parent returns the parent stream
- func (s *Stream) Parent() *Stream {
- return s.parent
- }
- // Headers returns the headers used to create the stream
- func (s *Stream) Headers() http.Header {
- return s.headers
- }
- // String returns the string version of stream using the
- // streamId to uniquely identify the stream
- func (s *Stream) String() string {
- return fmt.Sprintf("stream:%d", s.streamId)
- }
- // Identifier returns a 32 bit identifier for the stream
- func (s *Stream) Identifier() uint32 {
- return uint32(s.streamId)
- }
- // IsFinished returns whether the stream has finished
- // sending data
- func (s *Stream) IsFinished() bool {
- return s.finished
- }
- // Implement net.Conn interface
- func (s *Stream) LocalAddr() net.Addr {
- return s.conn.conn.LocalAddr()
- }
- func (s *Stream) RemoteAddr() net.Addr {
- return s.conn.conn.RemoteAddr()
- }
- // TODO set per stream values instead of connection-wide
- func (s *Stream) SetDeadline(t time.Time) error {
- return s.conn.conn.SetDeadline(t)
- }
- func (s *Stream) SetReadDeadline(t time.Time) error {
- return s.conn.conn.SetReadDeadline(t)
- }
- func (s *Stream) SetWriteDeadline(t time.Time) error {
- return s.conn.conn.SetWriteDeadline(t)
- }
- func (s *Stream) closeRemoteChannels() {
- s.closeLock.Lock()
- defer s.closeLock.Unlock()
- select {
- case <-s.closeChan:
- default:
- close(s.closeChan)
- }
- }
|