123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959 |
- package spdystream
- import (
- "errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "sync"
- "time"
- "github.com/docker/spdystream/spdy"
- )
- var (
- ErrInvalidStreamId = errors.New("Invalid stream id")
- ErrTimeout = errors.New("Timeout occured")
- ErrReset = errors.New("Stream reset")
- ErrWriteClosedStream = errors.New("Write on closed stream")
- )
- const (
- FRAME_WORKERS = 5
- QUEUE_SIZE = 50
- )
- type StreamHandler func(stream *Stream)
- type AuthHandler func(header http.Header, slot uint8, parent uint32) bool
- type idleAwareFramer struct {
- f *spdy.Framer
- conn *Connection
- writeLock sync.Mutex
- resetChan chan struct{}
- setTimeoutLock sync.Mutex
- setTimeoutChan chan time.Duration
- timeout time.Duration
- }
- func newIdleAwareFramer(framer *spdy.Framer) *idleAwareFramer {
- iaf := &idleAwareFramer{
- f: framer,
- resetChan: make(chan struct{}, 2),
- // setTimeoutChan needs to be buffered to avoid deadlocks when calling setIdleTimeout at about
- // the same time the connection is being closed
- setTimeoutChan: make(chan time.Duration, 1),
- }
- return iaf
- }
- func (i *idleAwareFramer) monitor() {
- var (
- timer *time.Timer
- expired <-chan time.Time
- resetChan = i.resetChan
- setTimeoutChan = i.setTimeoutChan
- )
- Loop:
- for {
- select {
- case timeout := <-i.setTimeoutChan:
- i.timeout = timeout
- if timeout == 0 {
- if timer != nil {
- timer.Stop()
- }
- } else {
- if timer == nil {
- timer = time.NewTimer(timeout)
- expired = timer.C
- } else {
- timer.Reset(timeout)
- }
- }
- case <-resetChan:
- if timer != nil && i.timeout > 0 {
- timer.Reset(i.timeout)
- }
- case <-expired:
- i.conn.streamCond.L.Lock()
- streams := i.conn.streams
- i.conn.streams = make(map[spdy.StreamId]*Stream)
- i.conn.streamCond.Broadcast()
- i.conn.streamCond.L.Unlock()
- go func() {
- for _, stream := range streams {
- stream.resetStream()
- }
- i.conn.Close()
- }()
- case <-i.conn.closeChan:
- if timer != nil {
- timer.Stop()
- }
- // Start a goroutine to drain resetChan. This is needed because we've seen
- // some unit tests with large numbers of goroutines get into a situation
- // where resetChan fills up, at least 1 call to Write() is still trying to
- // send to resetChan, the connection gets closed, and this case statement
- // attempts to grab the write lock that Write() already has, causing a
- // deadlock.
- //
- // See https://github.com/docker/spdystream/issues/49 for more details.
- go func() {
- for _ = range resetChan {
- }
- }()
- go func() {
- for _ = range setTimeoutChan {
- }
- }()
- i.writeLock.Lock()
- close(resetChan)
- i.resetChan = nil
- i.writeLock.Unlock()
- i.setTimeoutLock.Lock()
- close(i.setTimeoutChan)
- i.setTimeoutChan = nil
- i.setTimeoutLock.Unlock()
- break Loop
- }
- }
- // Drain resetChan
- for _ = range resetChan {
- }
- }
- func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error {
- i.writeLock.Lock()
- defer i.writeLock.Unlock()
- if i.resetChan == nil {
- return io.EOF
- }
- err := i.f.WriteFrame(frame)
- if err != nil {
- return err
- }
- i.resetChan <- struct{}{}
- return nil
- }
- func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) {
- frame, err := i.f.ReadFrame()
- if err != nil {
- return nil, err
- }
- // resetChan should never be closed since it is only closed
- // when the connection has closed its closeChan. This closure
- // only occurs after all Reads have finished
- // TODO (dmcgowan): refactor relationship into connection
- i.resetChan <- struct{}{}
- return frame, nil
- }
- func (i *idleAwareFramer) setIdleTimeout(timeout time.Duration) {
- i.setTimeoutLock.Lock()
- defer i.setTimeoutLock.Unlock()
- if i.setTimeoutChan == nil {
- return
- }
- i.setTimeoutChan <- timeout
- }
- type Connection struct {
- conn net.Conn
- framer *idleAwareFramer
- closeChan chan bool
- goneAway bool
- lastStreamChan chan<- *Stream
- goAwayTimeout time.Duration
- closeTimeout time.Duration
- streamLock *sync.RWMutex
- streamCond *sync.Cond
- streams map[spdy.StreamId]*Stream
- nextIdLock sync.Mutex
- receiveIdLock sync.Mutex
- nextStreamId spdy.StreamId
- receivedStreamId spdy.StreamId
- pingIdLock sync.Mutex
- pingId uint32
- pingChans map[uint32]chan error
- shutdownLock sync.Mutex
- shutdownChan chan error
- hasShutdown bool
- // for testing https://github.com/docker/spdystream/pull/56
- dataFrameHandler func(*spdy.DataFrame) error
- }
- // NewConnection creates a new spdy connection from an existing
- // network connection.
- func NewConnection(conn net.Conn, server bool) (*Connection, error) {
- framer, framerErr := spdy.NewFramer(conn, conn)
- if framerErr != nil {
- return nil, framerErr
- }
- idleAwareFramer := newIdleAwareFramer(framer)
- var sid spdy.StreamId
- var rid spdy.StreamId
- var pid uint32
- if server {
- sid = 2
- rid = 1
- pid = 2
- } else {
- sid = 1
- rid = 2
- pid = 1
- }
- streamLock := new(sync.RWMutex)
- streamCond := sync.NewCond(streamLock)
- session := &Connection{
- conn: conn,
- framer: idleAwareFramer,
- closeChan: make(chan bool),
- goAwayTimeout: time.Duration(0),
- closeTimeout: time.Duration(0),
- streamLock: streamLock,
- streamCond: streamCond,
- streams: make(map[spdy.StreamId]*Stream),
- nextStreamId: sid,
- receivedStreamId: rid,
- pingId: pid,
- pingChans: make(map[uint32]chan error),
- shutdownChan: make(chan error),
- }
- session.dataFrameHandler = session.handleDataFrame
- idleAwareFramer.conn = session
- go idleAwareFramer.monitor()
- return session, nil
- }
- // Ping sends a ping frame across the connection and
- // returns the response time
- func (s *Connection) Ping() (time.Duration, error) {
- pid := s.pingId
- s.pingIdLock.Lock()
- if s.pingId > 0x7ffffffe {
- s.pingId = s.pingId - 0x7ffffffe
- } else {
- s.pingId = s.pingId + 2
- }
- s.pingIdLock.Unlock()
- pingChan := make(chan error)
- s.pingChans[pid] = pingChan
- defer delete(s.pingChans, pid)
- frame := &spdy.PingFrame{Id: pid}
- startTime := time.Now()
- writeErr := s.framer.WriteFrame(frame)
- if writeErr != nil {
- return time.Duration(0), writeErr
- }
- select {
- case <-s.closeChan:
- return time.Duration(0), errors.New("connection closed")
- case err, ok := <-pingChan:
- if ok && err != nil {
- return time.Duration(0), err
- }
- break
- }
- return time.Now().Sub(startTime), nil
- }
- // Serve handles frames sent from the server, including reply frames
- // which are needed to fully initiate connections. Both clients and servers
- // should call Serve in a separate goroutine before creating streams.
- func (s *Connection) Serve(newHandler StreamHandler) {
- // use a WaitGroup to wait for all frames to be drained after receiving
- // go-away.
- var wg sync.WaitGroup
- // Parition queues to ensure stream frames are handled
- // by the same worker, ensuring order is maintained
- frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS)
- for i := 0; i < FRAME_WORKERS; i++ {
- frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE)
- // Ensure frame queue is drained when connection is closed
- go func(frameQueue *PriorityFrameQueue) {
- <-s.closeChan
- frameQueue.Drain()
- }(frameQueues[i])
- wg.Add(1)
- go func(frameQueue *PriorityFrameQueue) {
- // let the WaitGroup know this worker is done
- defer wg.Done()
- s.frameHandler(frameQueue, newHandler)
- }(frameQueues[i])
- }
- var (
- partitionRoundRobin int
- goAwayFrame *spdy.GoAwayFrame
- )
- Loop:
- for {
- readFrame, err := s.framer.ReadFrame()
- if err != nil {
- if err != io.EOF {
- fmt.Errorf("frame read error: %s", err)
- } else {
- debugMessage("(%p) EOF received", s)
- }
- break
- }
- var priority uint8
- var partition int
- switch frame := readFrame.(type) {
- case *spdy.SynStreamFrame:
- if s.checkStreamFrame(frame) {
- priority = frame.Priority
- partition = int(frame.StreamId % FRAME_WORKERS)
- debugMessage("(%p) Add stream frame: %d ", s, frame.StreamId)
- s.addStreamFrame(frame)
- } else {
- debugMessage("(%p) Rejected stream frame: %d ", s, frame.StreamId)
- continue
- }
- case *spdy.SynReplyFrame:
- priority = s.getStreamPriority(frame.StreamId)
- partition = int(frame.StreamId % FRAME_WORKERS)
- case *spdy.DataFrame:
- priority = s.getStreamPriority(frame.StreamId)
- partition = int(frame.StreamId % FRAME_WORKERS)
- case *spdy.RstStreamFrame:
- priority = s.getStreamPriority(frame.StreamId)
- partition = int(frame.StreamId % FRAME_WORKERS)
- case *spdy.HeadersFrame:
- priority = s.getStreamPriority(frame.StreamId)
- partition = int(frame.StreamId % FRAME_WORKERS)
- case *spdy.PingFrame:
- priority = 0
- partition = partitionRoundRobin
- partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
- case *spdy.GoAwayFrame:
- // hold on to the go away frame and exit the loop
- goAwayFrame = frame
- break Loop
- default:
- priority = 7
- partition = partitionRoundRobin
- partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
- }
- frameQueues[partition].Push(readFrame, priority)
- }
- close(s.closeChan)
- // wait for all frame handler workers to indicate they've drained their queues
- // before handling the go away frame
- wg.Wait()
- if goAwayFrame != nil {
- s.handleGoAwayFrame(goAwayFrame)
- }
- // now it's safe to close remote channels and empty s.streams
- s.streamCond.L.Lock()
- // notify streams that they're now closed, which will
- // unblock any stream Read() calls
- for _, stream := range s.streams {
- stream.closeRemoteChannels()
- }
- s.streams = make(map[spdy.StreamId]*Stream)
- s.streamCond.Broadcast()
- s.streamCond.L.Unlock()
- }
- func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler StreamHandler) {
- for {
- popFrame := frameQueue.Pop()
- if popFrame == nil {
- return
- }
- var frameErr error
- switch frame := popFrame.(type) {
- case *spdy.SynStreamFrame:
- frameErr = s.handleStreamFrame(frame, newHandler)
- case *spdy.SynReplyFrame:
- frameErr = s.handleReplyFrame(frame)
- case *spdy.DataFrame:
- frameErr = s.dataFrameHandler(frame)
- case *spdy.RstStreamFrame:
- frameErr = s.handleResetFrame(frame)
- case *spdy.HeadersFrame:
- frameErr = s.handleHeaderFrame(frame)
- case *spdy.PingFrame:
- frameErr = s.handlePingFrame(frame)
- case *spdy.GoAwayFrame:
- frameErr = s.handleGoAwayFrame(frame)
- default:
- frameErr = fmt.Errorf("unhandled frame type: %T", frame)
- }
- if frameErr != nil {
- fmt.Errorf("frame handling error: %s", frameErr)
- }
- }
- }
- func (s *Connection) getStreamPriority(streamId spdy.StreamId) uint8 {
- stream, streamOk := s.getStream(streamId)
- if !streamOk {
- return 7
- }
- return stream.priority
- }
- func (s *Connection) addStreamFrame(frame *spdy.SynStreamFrame) {
- var parent *Stream
- if frame.AssociatedToStreamId != spdy.StreamId(0) {
- parent, _ = s.getStream(frame.AssociatedToStreamId)
- }
- stream := &Stream{
- streamId: frame.StreamId,
- parent: parent,
- conn: s,
- startChan: make(chan error),
- headers: frame.Headers,
- finished: (frame.CFHeader.Flags & spdy.ControlFlagUnidirectional) != 0x00,
- replyCond: sync.NewCond(new(sync.Mutex)),
- dataChan: make(chan []byte),
- headerChan: make(chan http.Header),
- closeChan: make(chan bool),
- }
- if frame.CFHeader.Flags&spdy.ControlFlagFin != 0x00 {
- stream.closeRemoteChannels()
- }
- s.addStream(stream)
- }
- // checkStreamFrame checks to see if a stream frame is allowed.
- // If the stream is invalid, then a reset frame with protocol error
- // will be returned.
- func (s *Connection) checkStreamFrame(frame *spdy.SynStreamFrame) bool {
- s.receiveIdLock.Lock()
- defer s.receiveIdLock.Unlock()
- if s.goneAway {
- return false
- }
- validationErr := s.validateStreamId(frame.StreamId)
- if validationErr != nil {
- go func() {
- resetErr := s.sendResetFrame(spdy.ProtocolError, frame.StreamId)
- if resetErr != nil {
- fmt.Errorf("reset error: %s", resetErr)
- }
- }()
- return false
- }
- return true
- }
- func (s *Connection) handleStreamFrame(frame *spdy.SynStreamFrame, newHandler StreamHandler) error {
- stream, ok := s.getStream(frame.StreamId)
- if !ok {
- return fmt.Errorf("Missing stream: %d", frame.StreamId)
- }
- newHandler(stream)
- return nil
- }
- func (s *Connection) handleReplyFrame(frame *spdy.SynReplyFrame) error {
- debugMessage("(%p) Reply frame received for %d", s, frame.StreamId)
- stream, streamOk := s.getStream(frame.StreamId)
- if !streamOk {
- debugMessage("Reply frame gone away for %d", frame.StreamId)
- // Stream has already gone away
- return nil
- }
- if stream.replied {
- // Stream has already received reply
- return nil
- }
- stream.replied = true
- // TODO Check for error
- if (frame.CFHeader.Flags & spdy.ControlFlagFin) != 0x00 {
- s.remoteStreamFinish(stream)
- }
- close(stream.startChan)
- return nil
- }
- func (s *Connection) handleResetFrame(frame *spdy.RstStreamFrame) error {
- stream, streamOk := s.getStream(frame.StreamId)
- if !streamOk {
- // Stream has already been removed
- return nil
- }
- s.removeStream(stream)
- stream.closeRemoteChannels()
- if !stream.replied {
- stream.replied = true
- stream.startChan <- ErrReset
- close(stream.startChan)
- }
- stream.finishLock.Lock()
- stream.finished = true
- stream.finishLock.Unlock()
- return nil
- }
- func (s *Connection) handleHeaderFrame(frame *spdy.HeadersFrame) error {
- stream, streamOk := s.getStream(frame.StreamId)
- if !streamOk {
- // Stream has already gone away
- return nil
- }
- if !stream.replied {
- // No reply received...Protocol error?
- return nil
- }
- // TODO limit headers while not blocking (use buffered chan or goroutine?)
- select {
- case <-stream.closeChan:
- return nil
- case stream.headerChan <- frame.Headers:
- }
- if (frame.CFHeader.Flags & spdy.ControlFlagFin) != 0x00 {
- s.remoteStreamFinish(stream)
- }
- return nil
- }
- func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error {
- debugMessage("(%p) Data frame received for %d", s, frame.StreamId)
- stream, streamOk := s.getStream(frame.StreamId)
- if !streamOk {
- debugMessage("(%p) Data frame gone away for %d", s, frame.StreamId)
- // Stream has already gone away
- return nil
- }
- if !stream.replied {
- debugMessage("(%p) Data frame not replied %d", s, frame.StreamId)
- // No reply received...Protocol error?
- return nil
- }
- debugMessage("(%p) (%d) Data frame handling", stream, stream.streamId)
- if len(frame.Data) > 0 {
- stream.dataLock.RLock()
- select {
- case <-stream.closeChan:
- debugMessage("(%p) (%d) Data frame not sent (stream shut down)", stream, stream.streamId)
- case stream.dataChan <- frame.Data:
- debugMessage("(%p) (%d) Data frame sent", stream, stream.streamId)
- }
- stream.dataLock.RUnlock()
- }
- if (frame.Flags & spdy.DataFlagFin) != 0x00 {
- s.remoteStreamFinish(stream)
- }
- return nil
- }
- func (s *Connection) handlePingFrame(frame *spdy.PingFrame) error {
- if s.pingId&0x01 != frame.Id&0x01 {
- return s.framer.WriteFrame(frame)
- }
- pingChan, pingOk := s.pingChans[frame.Id]
- if pingOk {
- close(pingChan)
- }
- return nil
- }
- func (s *Connection) handleGoAwayFrame(frame *spdy.GoAwayFrame) error {
- debugMessage("(%p) Go away received", s)
- s.receiveIdLock.Lock()
- if s.goneAway {
- s.receiveIdLock.Unlock()
- return nil
- }
- s.goneAway = true
- s.receiveIdLock.Unlock()
- if s.lastStreamChan != nil {
- stream, _ := s.getStream(frame.LastGoodStreamId)
- go func() {
- s.lastStreamChan <- stream
- }()
- }
- // Do not block frame handler waiting for closure
- go s.shutdown(s.goAwayTimeout)
- return nil
- }
- func (s *Connection) remoteStreamFinish(stream *Stream) {
- stream.closeRemoteChannels()
- stream.finishLock.Lock()
- if stream.finished {
- // Stream is fully closed, cleanup
- s.removeStream(stream)
- }
- stream.finishLock.Unlock()
- }
- // CreateStream creates a new spdy stream using the parameters for
- // creating the stream frame. The stream frame will be sent upon
- // calling this function, however this function does not wait for
- // the reply frame. If waiting for the reply is desired, use
- // the stream Wait or WaitTimeout function on the stream returned
- // by this function.
- func (s *Connection) CreateStream(headers http.Header, parent *Stream, fin bool) (*Stream, error) {
- // MUST synchronize stream creation (all the way to writing the frame)
- // as stream IDs **MUST** increase monotonically.
- s.nextIdLock.Lock()
- defer s.nextIdLock.Unlock()
- streamId := s.getNextStreamId()
- if streamId == 0 {
- return nil, fmt.Errorf("Unable to get new stream id")
- }
- stream := &Stream{
- streamId: streamId,
- parent: parent,
- conn: s,
- startChan: make(chan error),
- headers: headers,
- dataChan: make(chan []byte),
- headerChan: make(chan http.Header),
- closeChan: make(chan bool),
- }
- debugMessage("(%p) (%p) Create stream", s, stream)
- s.addStream(stream)
- return stream, s.sendStream(stream, fin)
- }
- func (s *Connection) shutdown(closeTimeout time.Duration) {
- // TODO Ensure this isn't called multiple times
- s.shutdownLock.Lock()
- if s.hasShutdown {
- s.shutdownLock.Unlock()
- return
- }
- s.hasShutdown = true
- s.shutdownLock.Unlock()
- var timeout <-chan time.Time
- if closeTimeout > time.Duration(0) {
- timeout = time.After(closeTimeout)
- }
- streamsClosed := make(chan bool)
- go func() {
- s.streamCond.L.Lock()
- for len(s.streams) > 0 {
- debugMessage("Streams opened: %d, %#v", len(s.streams), s.streams)
- s.streamCond.Wait()
- }
- s.streamCond.L.Unlock()
- close(streamsClosed)
- }()
- var err error
- select {
- case <-streamsClosed:
- // No active streams, close should be safe
- err = s.conn.Close()
- case <-timeout:
- // Force ungraceful close
- err = s.conn.Close()
- // Wait for cleanup to clear active streams
- <-streamsClosed
- }
- if err != nil {
- duration := 10 * time.Minute
- time.AfterFunc(duration, func() {
- select {
- case err, ok := <-s.shutdownChan:
- if ok {
- fmt.Errorf("Unhandled close error after %s: %s", duration, err)
- }
- default:
- }
- })
- s.shutdownChan <- err
- }
- close(s.shutdownChan)
- return
- }
- // Closes spdy connection by sending GoAway frame and initiating shutdown
- func (s *Connection) Close() error {
- s.receiveIdLock.Lock()
- if s.goneAway {
- s.receiveIdLock.Unlock()
- return nil
- }
- s.goneAway = true
- s.receiveIdLock.Unlock()
- var lastStreamId spdy.StreamId
- if s.receivedStreamId > 2 {
- lastStreamId = s.receivedStreamId - 2
- }
- goAwayFrame := &spdy.GoAwayFrame{
- LastGoodStreamId: lastStreamId,
- Status: spdy.GoAwayOK,
- }
- err := s.framer.WriteFrame(goAwayFrame)
- if err != nil {
- return err
- }
- go s.shutdown(s.closeTimeout)
- return nil
- }
- // CloseWait closes the connection and waits for shutdown
- // to finish. Note the underlying network Connection
- // is not closed until the end of shutdown.
- func (s *Connection) CloseWait() error {
- closeErr := s.Close()
- if closeErr != nil {
- return closeErr
- }
- shutdownErr, ok := <-s.shutdownChan
- if ok {
- return shutdownErr
- }
- return nil
- }
- // Wait waits for the connection to finish shutdown or for
- // the wait timeout duration to expire. This needs to be
- // called either after Close has been called or the GOAWAYFRAME
- // has been received. If the wait timeout is 0, this function
- // will block until shutdown finishes. If wait is never called
- // and a shutdown error occurs, that error will be logged as an
- // unhandled error.
- func (s *Connection) Wait(waitTimeout time.Duration) error {
- var timeout <-chan time.Time
- if waitTimeout > time.Duration(0) {
- timeout = time.After(waitTimeout)
- }
- select {
- case err, ok := <-s.shutdownChan:
- if ok {
- return err
- }
- case <-timeout:
- return ErrTimeout
- }
- return nil
- }
- // NotifyClose registers a channel to be called when the remote
- // peer inidicates connection closure. The last stream to be
- // received by the remote will be sent on the channel. The notify
- // timeout will determine the duration between go away received
- // and the connection being closed.
- func (s *Connection) NotifyClose(c chan<- *Stream, timeout time.Duration) {
- s.goAwayTimeout = timeout
- s.lastStreamChan = c
- }
- // SetCloseTimeout sets the amount of time close will wait for
- // streams to finish before terminating the underlying network
- // connection. Setting the timeout to 0 will cause close to
- // wait forever, which is the default.
- func (s *Connection) SetCloseTimeout(timeout time.Duration) {
- s.closeTimeout = timeout
- }
- // SetIdleTimeout sets the amount of time the connection may sit idle before
- // it is forcefully terminated.
- func (s *Connection) SetIdleTimeout(timeout time.Duration) {
- s.framer.setIdleTimeout(timeout)
- }
- func (s *Connection) sendHeaders(headers http.Header, stream *Stream, fin bool) error {
- var flags spdy.ControlFlags
- if fin {
- flags = spdy.ControlFlagFin
- }
- headerFrame := &spdy.HeadersFrame{
- StreamId: stream.streamId,
- Headers: headers,
- CFHeader: spdy.ControlFrameHeader{Flags: flags},
- }
- return s.framer.WriteFrame(headerFrame)
- }
- func (s *Connection) sendReply(headers http.Header, stream *Stream, fin bool) error {
- var flags spdy.ControlFlags
- if fin {
- flags = spdy.ControlFlagFin
- }
- replyFrame := &spdy.SynReplyFrame{
- StreamId: stream.streamId,
- Headers: headers,
- CFHeader: spdy.ControlFrameHeader{Flags: flags},
- }
- return s.framer.WriteFrame(replyFrame)
- }
- func (s *Connection) sendResetFrame(status spdy.RstStreamStatus, streamId spdy.StreamId) error {
- resetFrame := &spdy.RstStreamFrame{
- StreamId: streamId,
- Status: status,
- }
- return s.framer.WriteFrame(resetFrame)
- }
- func (s *Connection) sendReset(status spdy.RstStreamStatus, stream *Stream) error {
- return s.sendResetFrame(status, stream.streamId)
- }
- func (s *Connection) sendStream(stream *Stream, fin bool) error {
- var flags spdy.ControlFlags
- if fin {
- flags = spdy.ControlFlagFin
- stream.finished = true
- }
- var parentId spdy.StreamId
- if stream.parent != nil {
- parentId = stream.parent.streamId
- }
- streamFrame := &spdy.SynStreamFrame{
- StreamId: spdy.StreamId(stream.streamId),
- AssociatedToStreamId: spdy.StreamId(parentId),
- Headers: stream.headers,
- CFHeader: spdy.ControlFrameHeader{Flags: flags},
- }
- return s.framer.WriteFrame(streamFrame)
- }
- // getNextStreamId returns the next sequential id
- // every call should produce a unique value or an error
- func (s *Connection) getNextStreamId() spdy.StreamId {
- sid := s.nextStreamId
- if sid > 0x7fffffff {
- return 0
- }
- s.nextStreamId = s.nextStreamId + 2
- return sid
- }
- // PeekNextStreamId returns the next sequential id and keeps the next id untouched
- func (s *Connection) PeekNextStreamId() spdy.StreamId {
- sid := s.nextStreamId
- return sid
- }
- func (s *Connection) validateStreamId(rid spdy.StreamId) error {
- if rid > 0x7fffffff || rid < s.receivedStreamId {
- return ErrInvalidStreamId
- }
- s.receivedStreamId = rid + 2
- return nil
- }
- func (s *Connection) addStream(stream *Stream) {
- s.streamCond.L.Lock()
- s.streams[stream.streamId] = stream
- debugMessage("(%p) (%p) Stream added, broadcasting: %d", s, stream, stream.streamId)
- s.streamCond.Broadcast()
- s.streamCond.L.Unlock()
- }
- func (s *Connection) removeStream(stream *Stream) {
- s.streamCond.L.Lock()
- delete(s.streams, stream.streamId)
- debugMessage("(%p) (%p) Stream removed, broadcasting: %d", s, stream, stream.streamId)
- s.streamCond.Broadcast()
- s.streamCond.L.Unlock()
- }
- func (s *Connection) getStream(streamId spdy.StreamId) (stream *Stream, ok bool) {
- s.streamLock.RLock()
- stream, ok = s.streams[streamId]
- s.streamLock.RUnlock()
- return
- }
- // FindStream looks up the given stream id and either waits for the
- // stream to be found or returns nil if the stream id is no longer
- // valid.
- func (s *Connection) FindStream(streamId uint32) *Stream {
- var stream *Stream
- var ok bool
- s.streamCond.L.Lock()
- stream, ok = s.streams[spdy.StreamId(streamId)]
- debugMessage("(%p) Found stream %d? %t", s, spdy.StreamId(streamId), ok)
- for !ok && streamId >= uint32(s.receivedStreamId) {
- s.streamCond.Wait()
- stream, ok = s.streams[spdy.StreamId(streamId)]
- }
- s.streamCond.L.Unlock()
- return stream
- }
- func (s *Connection) CloseChan() <-chan bool {
- return s.closeChan
- }
|