connection.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959
  1. package spdystream
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "sync"
  9. "time"
  10. "github.com/docker/spdystream/spdy"
  11. )
  12. var (
  13. ErrInvalidStreamId = errors.New("Invalid stream id")
  14. ErrTimeout = errors.New("Timeout occured")
  15. ErrReset = errors.New("Stream reset")
  16. ErrWriteClosedStream = errors.New("Write on closed stream")
  17. )
  18. const (
  19. FRAME_WORKERS = 5
  20. QUEUE_SIZE = 50
  21. )
  22. type StreamHandler func(stream *Stream)
  23. type AuthHandler func(header http.Header, slot uint8, parent uint32) bool
  24. type idleAwareFramer struct {
  25. f *spdy.Framer
  26. conn *Connection
  27. writeLock sync.Mutex
  28. resetChan chan struct{}
  29. setTimeoutLock sync.Mutex
  30. setTimeoutChan chan time.Duration
  31. timeout time.Duration
  32. }
  33. func newIdleAwareFramer(framer *spdy.Framer) *idleAwareFramer {
  34. iaf := &idleAwareFramer{
  35. f: framer,
  36. resetChan: make(chan struct{}, 2),
  37. // setTimeoutChan needs to be buffered to avoid deadlocks when calling setIdleTimeout at about
  38. // the same time the connection is being closed
  39. setTimeoutChan: make(chan time.Duration, 1),
  40. }
  41. return iaf
  42. }
  43. func (i *idleAwareFramer) monitor() {
  44. var (
  45. timer *time.Timer
  46. expired <-chan time.Time
  47. resetChan = i.resetChan
  48. setTimeoutChan = i.setTimeoutChan
  49. )
  50. Loop:
  51. for {
  52. select {
  53. case timeout := <-i.setTimeoutChan:
  54. i.timeout = timeout
  55. if timeout == 0 {
  56. if timer != nil {
  57. timer.Stop()
  58. }
  59. } else {
  60. if timer == nil {
  61. timer = time.NewTimer(timeout)
  62. expired = timer.C
  63. } else {
  64. timer.Reset(timeout)
  65. }
  66. }
  67. case <-resetChan:
  68. if timer != nil && i.timeout > 0 {
  69. timer.Reset(i.timeout)
  70. }
  71. case <-expired:
  72. i.conn.streamCond.L.Lock()
  73. streams := i.conn.streams
  74. i.conn.streams = make(map[spdy.StreamId]*Stream)
  75. i.conn.streamCond.Broadcast()
  76. i.conn.streamCond.L.Unlock()
  77. go func() {
  78. for _, stream := range streams {
  79. stream.resetStream()
  80. }
  81. i.conn.Close()
  82. }()
  83. case <-i.conn.closeChan:
  84. if timer != nil {
  85. timer.Stop()
  86. }
  87. // Start a goroutine to drain resetChan. This is needed because we've seen
  88. // some unit tests with large numbers of goroutines get into a situation
  89. // where resetChan fills up, at least 1 call to Write() is still trying to
  90. // send to resetChan, the connection gets closed, and this case statement
  91. // attempts to grab the write lock that Write() already has, causing a
  92. // deadlock.
  93. //
  94. // See https://github.com/docker/spdystream/issues/49 for more details.
  95. go func() {
  96. for _ = range resetChan {
  97. }
  98. }()
  99. go func() {
  100. for _ = range setTimeoutChan {
  101. }
  102. }()
  103. i.writeLock.Lock()
  104. close(resetChan)
  105. i.resetChan = nil
  106. i.writeLock.Unlock()
  107. i.setTimeoutLock.Lock()
  108. close(i.setTimeoutChan)
  109. i.setTimeoutChan = nil
  110. i.setTimeoutLock.Unlock()
  111. break Loop
  112. }
  113. }
  114. // Drain resetChan
  115. for _ = range resetChan {
  116. }
  117. }
  118. func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error {
  119. i.writeLock.Lock()
  120. defer i.writeLock.Unlock()
  121. if i.resetChan == nil {
  122. return io.EOF
  123. }
  124. err := i.f.WriteFrame(frame)
  125. if err != nil {
  126. return err
  127. }
  128. i.resetChan <- struct{}{}
  129. return nil
  130. }
  131. func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) {
  132. frame, err := i.f.ReadFrame()
  133. if err != nil {
  134. return nil, err
  135. }
  136. // resetChan should never be closed since it is only closed
  137. // when the connection has closed its closeChan. This closure
  138. // only occurs after all Reads have finished
  139. // TODO (dmcgowan): refactor relationship into connection
  140. i.resetChan <- struct{}{}
  141. return frame, nil
  142. }
  143. func (i *idleAwareFramer) setIdleTimeout(timeout time.Duration) {
  144. i.setTimeoutLock.Lock()
  145. defer i.setTimeoutLock.Unlock()
  146. if i.setTimeoutChan == nil {
  147. return
  148. }
  149. i.setTimeoutChan <- timeout
  150. }
  151. type Connection struct {
  152. conn net.Conn
  153. framer *idleAwareFramer
  154. closeChan chan bool
  155. goneAway bool
  156. lastStreamChan chan<- *Stream
  157. goAwayTimeout time.Duration
  158. closeTimeout time.Duration
  159. streamLock *sync.RWMutex
  160. streamCond *sync.Cond
  161. streams map[spdy.StreamId]*Stream
  162. nextIdLock sync.Mutex
  163. receiveIdLock sync.Mutex
  164. nextStreamId spdy.StreamId
  165. receivedStreamId spdy.StreamId
  166. pingIdLock sync.Mutex
  167. pingId uint32
  168. pingChans map[uint32]chan error
  169. shutdownLock sync.Mutex
  170. shutdownChan chan error
  171. hasShutdown bool
  172. // for testing https://github.com/docker/spdystream/pull/56
  173. dataFrameHandler func(*spdy.DataFrame) error
  174. }
  175. // NewConnection creates a new spdy connection from an existing
  176. // network connection.
  177. func NewConnection(conn net.Conn, server bool) (*Connection, error) {
  178. framer, framerErr := spdy.NewFramer(conn, conn)
  179. if framerErr != nil {
  180. return nil, framerErr
  181. }
  182. idleAwareFramer := newIdleAwareFramer(framer)
  183. var sid spdy.StreamId
  184. var rid spdy.StreamId
  185. var pid uint32
  186. if server {
  187. sid = 2
  188. rid = 1
  189. pid = 2
  190. } else {
  191. sid = 1
  192. rid = 2
  193. pid = 1
  194. }
  195. streamLock := new(sync.RWMutex)
  196. streamCond := sync.NewCond(streamLock)
  197. session := &Connection{
  198. conn: conn,
  199. framer: idleAwareFramer,
  200. closeChan: make(chan bool),
  201. goAwayTimeout: time.Duration(0),
  202. closeTimeout: time.Duration(0),
  203. streamLock: streamLock,
  204. streamCond: streamCond,
  205. streams: make(map[spdy.StreamId]*Stream),
  206. nextStreamId: sid,
  207. receivedStreamId: rid,
  208. pingId: pid,
  209. pingChans: make(map[uint32]chan error),
  210. shutdownChan: make(chan error),
  211. }
  212. session.dataFrameHandler = session.handleDataFrame
  213. idleAwareFramer.conn = session
  214. go idleAwareFramer.monitor()
  215. return session, nil
  216. }
  217. // Ping sends a ping frame across the connection and
  218. // returns the response time
  219. func (s *Connection) Ping() (time.Duration, error) {
  220. pid := s.pingId
  221. s.pingIdLock.Lock()
  222. if s.pingId > 0x7ffffffe {
  223. s.pingId = s.pingId - 0x7ffffffe
  224. } else {
  225. s.pingId = s.pingId + 2
  226. }
  227. s.pingIdLock.Unlock()
  228. pingChan := make(chan error)
  229. s.pingChans[pid] = pingChan
  230. defer delete(s.pingChans, pid)
  231. frame := &spdy.PingFrame{Id: pid}
  232. startTime := time.Now()
  233. writeErr := s.framer.WriteFrame(frame)
  234. if writeErr != nil {
  235. return time.Duration(0), writeErr
  236. }
  237. select {
  238. case <-s.closeChan:
  239. return time.Duration(0), errors.New("connection closed")
  240. case err, ok := <-pingChan:
  241. if ok && err != nil {
  242. return time.Duration(0), err
  243. }
  244. break
  245. }
  246. return time.Now().Sub(startTime), nil
  247. }
  248. // Serve handles frames sent from the server, including reply frames
  249. // which are needed to fully initiate connections. Both clients and servers
  250. // should call Serve in a separate goroutine before creating streams.
  251. func (s *Connection) Serve(newHandler StreamHandler) {
  252. // use a WaitGroup to wait for all frames to be drained after receiving
  253. // go-away.
  254. var wg sync.WaitGroup
  255. // Parition queues to ensure stream frames are handled
  256. // by the same worker, ensuring order is maintained
  257. frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS)
  258. for i := 0; i < FRAME_WORKERS; i++ {
  259. frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE)
  260. // Ensure frame queue is drained when connection is closed
  261. go func(frameQueue *PriorityFrameQueue) {
  262. <-s.closeChan
  263. frameQueue.Drain()
  264. }(frameQueues[i])
  265. wg.Add(1)
  266. go func(frameQueue *PriorityFrameQueue) {
  267. // let the WaitGroup know this worker is done
  268. defer wg.Done()
  269. s.frameHandler(frameQueue, newHandler)
  270. }(frameQueues[i])
  271. }
  272. var (
  273. partitionRoundRobin int
  274. goAwayFrame *spdy.GoAwayFrame
  275. )
  276. Loop:
  277. for {
  278. readFrame, err := s.framer.ReadFrame()
  279. if err != nil {
  280. if err != io.EOF {
  281. fmt.Errorf("frame read error: %s", err)
  282. } else {
  283. debugMessage("(%p) EOF received", s)
  284. }
  285. break
  286. }
  287. var priority uint8
  288. var partition int
  289. switch frame := readFrame.(type) {
  290. case *spdy.SynStreamFrame:
  291. if s.checkStreamFrame(frame) {
  292. priority = frame.Priority
  293. partition = int(frame.StreamId % FRAME_WORKERS)
  294. debugMessage("(%p) Add stream frame: %d ", s, frame.StreamId)
  295. s.addStreamFrame(frame)
  296. } else {
  297. debugMessage("(%p) Rejected stream frame: %d ", s, frame.StreamId)
  298. continue
  299. }
  300. case *spdy.SynReplyFrame:
  301. priority = s.getStreamPriority(frame.StreamId)
  302. partition = int(frame.StreamId % FRAME_WORKERS)
  303. case *spdy.DataFrame:
  304. priority = s.getStreamPriority(frame.StreamId)
  305. partition = int(frame.StreamId % FRAME_WORKERS)
  306. case *spdy.RstStreamFrame:
  307. priority = s.getStreamPriority(frame.StreamId)
  308. partition = int(frame.StreamId % FRAME_WORKERS)
  309. case *spdy.HeadersFrame:
  310. priority = s.getStreamPriority(frame.StreamId)
  311. partition = int(frame.StreamId % FRAME_WORKERS)
  312. case *spdy.PingFrame:
  313. priority = 0
  314. partition = partitionRoundRobin
  315. partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
  316. case *spdy.GoAwayFrame:
  317. // hold on to the go away frame and exit the loop
  318. goAwayFrame = frame
  319. break Loop
  320. default:
  321. priority = 7
  322. partition = partitionRoundRobin
  323. partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
  324. }
  325. frameQueues[partition].Push(readFrame, priority)
  326. }
  327. close(s.closeChan)
  328. // wait for all frame handler workers to indicate they've drained their queues
  329. // before handling the go away frame
  330. wg.Wait()
  331. if goAwayFrame != nil {
  332. s.handleGoAwayFrame(goAwayFrame)
  333. }
  334. // now it's safe to close remote channels and empty s.streams
  335. s.streamCond.L.Lock()
  336. // notify streams that they're now closed, which will
  337. // unblock any stream Read() calls
  338. for _, stream := range s.streams {
  339. stream.closeRemoteChannels()
  340. }
  341. s.streams = make(map[spdy.StreamId]*Stream)
  342. s.streamCond.Broadcast()
  343. s.streamCond.L.Unlock()
  344. }
  345. func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler StreamHandler) {
  346. for {
  347. popFrame := frameQueue.Pop()
  348. if popFrame == nil {
  349. return
  350. }
  351. var frameErr error
  352. switch frame := popFrame.(type) {
  353. case *spdy.SynStreamFrame:
  354. frameErr = s.handleStreamFrame(frame, newHandler)
  355. case *spdy.SynReplyFrame:
  356. frameErr = s.handleReplyFrame(frame)
  357. case *spdy.DataFrame:
  358. frameErr = s.dataFrameHandler(frame)
  359. case *spdy.RstStreamFrame:
  360. frameErr = s.handleResetFrame(frame)
  361. case *spdy.HeadersFrame:
  362. frameErr = s.handleHeaderFrame(frame)
  363. case *spdy.PingFrame:
  364. frameErr = s.handlePingFrame(frame)
  365. case *spdy.GoAwayFrame:
  366. frameErr = s.handleGoAwayFrame(frame)
  367. default:
  368. frameErr = fmt.Errorf("unhandled frame type: %T", frame)
  369. }
  370. if frameErr != nil {
  371. fmt.Errorf("frame handling error: %s", frameErr)
  372. }
  373. }
  374. }
  375. func (s *Connection) getStreamPriority(streamId spdy.StreamId) uint8 {
  376. stream, streamOk := s.getStream(streamId)
  377. if !streamOk {
  378. return 7
  379. }
  380. return stream.priority
  381. }
  382. func (s *Connection) addStreamFrame(frame *spdy.SynStreamFrame) {
  383. var parent *Stream
  384. if frame.AssociatedToStreamId != spdy.StreamId(0) {
  385. parent, _ = s.getStream(frame.AssociatedToStreamId)
  386. }
  387. stream := &Stream{
  388. streamId: frame.StreamId,
  389. parent: parent,
  390. conn: s,
  391. startChan: make(chan error),
  392. headers: frame.Headers,
  393. finished: (frame.CFHeader.Flags & spdy.ControlFlagUnidirectional) != 0x00,
  394. replyCond: sync.NewCond(new(sync.Mutex)),
  395. dataChan: make(chan []byte),
  396. headerChan: make(chan http.Header),
  397. closeChan: make(chan bool),
  398. }
  399. if frame.CFHeader.Flags&spdy.ControlFlagFin != 0x00 {
  400. stream.closeRemoteChannels()
  401. }
  402. s.addStream(stream)
  403. }
  404. // checkStreamFrame checks to see if a stream frame is allowed.
  405. // If the stream is invalid, then a reset frame with protocol error
  406. // will be returned.
  407. func (s *Connection) checkStreamFrame(frame *spdy.SynStreamFrame) bool {
  408. s.receiveIdLock.Lock()
  409. defer s.receiveIdLock.Unlock()
  410. if s.goneAway {
  411. return false
  412. }
  413. validationErr := s.validateStreamId(frame.StreamId)
  414. if validationErr != nil {
  415. go func() {
  416. resetErr := s.sendResetFrame(spdy.ProtocolError, frame.StreamId)
  417. if resetErr != nil {
  418. fmt.Errorf("reset error: %s", resetErr)
  419. }
  420. }()
  421. return false
  422. }
  423. return true
  424. }
  425. func (s *Connection) handleStreamFrame(frame *spdy.SynStreamFrame, newHandler StreamHandler) error {
  426. stream, ok := s.getStream(frame.StreamId)
  427. if !ok {
  428. return fmt.Errorf("Missing stream: %d", frame.StreamId)
  429. }
  430. newHandler(stream)
  431. return nil
  432. }
  433. func (s *Connection) handleReplyFrame(frame *spdy.SynReplyFrame) error {
  434. debugMessage("(%p) Reply frame received for %d", s, frame.StreamId)
  435. stream, streamOk := s.getStream(frame.StreamId)
  436. if !streamOk {
  437. debugMessage("Reply frame gone away for %d", frame.StreamId)
  438. // Stream has already gone away
  439. return nil
  440. }
  441. if stream.replied {
  442. // Stream has already received reply
  443. return nil
  444. }
  445. stream.replied = true
  446. // TODO Check for error
  447. if (frame.CFHeader.Flags & spdy.ControlFlagFin) != 0x00 {
  448. s.remoteStreamFinish(stream)
  449. }
  450. close(stream.startChan)
  451. return nil
  452. }
  453. func (s *Connection) handleResetFrame(frame *spdy.RstStreamFrame) error {
  454. stream, streamOk := s.getStream(frame.StreamId)
  455. if !streamOk {
  456. // Stream has already been removed
  457. return nil
  458. }
  459. s.removeStream(stream)
  460. stream.closeRemoteChannels()
  461. if !stream.replied {
  462. stream.replied = true
  463. stream.startChan <- ErrReset
  464. close(stream.startChan)
  465. }
  466. stream.finishLock.Lock()
  467. stream.finished = true
  468. stream.finishLock.Unlock()
  469. return nil
  470. }
  471. func (s *Connection) handleHeaderFrame(frame *spdy.HeadersFrame) error {
  472. stream, streamOk := s.getStream(frame.StreamId)
  473. if !streamOk {
  474. // Stream has already gone away
  475. return nil
  476. }
  477. if !stream.replied {
  478. // No reply received...Protocol error?
  479. return nil
  480. }
  481. // TODO limit headers while not blocking (use buffered chan or goroutine?)
  482. select {
  483. case <-stream.closeChan:
  484. return nil
  485. case stream.headerChan <- frame.Headers:
  486. }
  487. if (frame.CFHeader.Flags & spdy.ControlFlagFin) != 0x00 {
  488. s.remoteStreamFinish(stream)
  489. }
  490. return nil
  491. }
  492. func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error {
  493. debugMessage("(%p) Data frame received for %d", s, frame.StreamId)
  494. stream, streamOk := s.getStream(frame.StreamId)
  495. if !streamOk {
  496. debugMessage("(%p) Data frame gone away for %d", s, frame.StreamId)
  497. // Stream has already gone away
  498. return nil
  499. }
  500. if !stream.replied {
  501. debugMessage("(%p) Data frame not replied %d", s, frame.StreamId)
  502. // No reply received...Protocol error?
  503. return nil
  504. }
  505. debugMessage("(%p) (%d) Data frame handling", stream, stream.streamId)
  506. if len(frame.Data) > 0 {
  507. stream.dataLock.RLock()
  508. select {
  509. case <-stream.closeChan:
  510. debugMessage("(%p) (%d) Data frame not sent (stream shut down)", stream, stream.streamId)
  511. case stream.dataChan <- frame.Data:
  512. debugMessage("(%p) (%d) Data frame sent", stream, stream.streamId)
  513. }
  514. stream.dataLock.RUnlock()
  515. }
  516. if (frame.Flags & spdy.DataFlagFin) != 0x00 {
  517. s.remoteStreamFinish(stream)
  518. }
  519. return nil
  520. }
  521. func (s *Connection) handlePingFrame(frame *spdy.PingFrame) error {
  522. if s.pingId&0x01 != frame.Id&0x01 {
  523. return s.framer.WriteFrame(frame)
  524. }
  525. pingChan, pingOk := s.pingChans[frame.Id]
  526. if pingOk {
  527. close(pingChan)
  528. }
  529. return nil
  530. }
  531. func (s *Connection) handleGoAwayFrame(frame *spdy.GoAwayFrame) error {
  532. debugMessage("(%p) Go away received", s)
  533. s.receiveIdLock.Lock()
  534. if s.goneAway {
  535. s.receiveIdLock.Unlock()
  536. return nil
  537. }
  538. s.goneAway = true
  539. s.receiveIdLock.Unlock()
  540. if s.lastStreamChan != nil {
  541. stream, _ := s.getStream(frame.LastGoodStreamId)
  542. go func() {
  543. s.lastStreamChan <- stream
  544. }()
  545. }
  546. // Do not block frame handler waiting for closure
  547. go s.shutdown(s.goAwayTimeout)
  548. return nil
  549. }
  550. func (s *Connection) remoteStreamFinish(stream *Stream) {
  551. stream.closeRemoteChannels()
  552. stream.finishLock.Lock()
  553. if stream.finished {
  554. // Stream is fully closed, cleanup
  555. s.removeStream(stream)
  556. }
  557. stream.finishLock.Unlock()
  558. }
  559. // CreateStream creates a new spdy stream using the parameters for
  560. // creating the stream frame. The stream frame will be sent upon
  561. // calling this function, however this function does not wait for
  562. // the reply frame. If waiting for the reply is desired, use
  563. // the stream Wait or WaitTimeout function on the stream returned
  564. // by this function.
  565. func (s *Connection) CreateStream(headers http.Header, parent *Stream, fin bool) (*Stream, error) {
  566. // MUST synchronize stream creation (all the way to writing the frame)
  567. // as stream IDs **MUST** increase monotonically.
  568. s.nextIdLock.Lock()
  569. defer s.nextIdLock.Unlock()
  570. streamId := s.getNextStreamId()
  571. if streamId == 0 {
  572. return nil, fmt.Errorf("Unable to get new stream id")
  573. }
  574. stream := &Stream{
  575. streamId: streamId,
  576. parent: parent,
  577. conn: s,
  578. startChan: make(chan error),
  579. headers: headers,
  580. dataChan: make(chan []byte),
  581. headerChan: make(chan http.Header),
  582. closeChan: make(chan bool),
  583. }
  584. debugMessage("(%p) (%p) Create stream", s, stream)
  585. s.addStream(stream)
  586. return stream, s.sendStream(stream, fin)
  587. }
  588. func (s *Connection) shutdown(closeTimeout time.Duration) {
  589. // TODO Ensure this isn't called multiple times
  590. s.shutdownLock.Lock()
  591. if s.hasShutdown {
  592. s.shutdownLock.Unlock()
  593. return
  594. }
  595. s.hasShutdown = true
  596. s.shutdownLock.Unlock()
  597. var timeout <-chan time.Time
  598. if closeTimeout > time.Duration(0) {
  599. timeout = time.After(closeTimeout)
  600. }
  601. streamsClosed := make(chan bool)
  602. go func() {
  603. s.streamCond.L.Lock()
  604. for len(s.streams) > 0 {
  605. debugMessage("Streams opened: %d, %#v", len(s.streams), s.streams)
  606. s.streamCond.Wait()
  607. }
  608. s.streamCond.L.Unlock()
  609. close(streamsClosed)
  610. }()
  611. var err error
  612. select {
  613. case <-streamsClosed:
  614. // No active streams, close should be safe
  615. err = s.conn.Close()
  616. case <-timeout:
  617. // Force ungraceful close
  618. err = s.conn.Close()
  619. // Wait for cleanup to clear active streams
  620. <-streamsClosed
  621. }
  622. if err != nil {
  623. duration := 10 * time.Minute
  624. time.AfterFunc(duration, func() {
  625. select {
  626. case err, ok := <-s.shutdownChan:
  627. if ok {
  628. fmt.Errorf("Unhandled close error after %s: %s", duration, err)
  629. }
  630. default:
  631. }
  632. })
  633. s.shutdownChan <- err
  634. }
  635. close(s.shutdownChan)
  636. return
  637. }
  638. // Closes spdy connection by sending GoAway frame and initiating shutdown
  639. func (s *Connection) Close() error {
  640. s.receiveIdLock.Lock()
  641. if s.goneAway {
  642. s.receiveIdLock.Unlock()
  643. return nil
  644. }
  645. s.goneAway = true
  646. s.receiveIdLock.Unlock()
  647. var lastStreamId spdy.StreamId
  648. if s.receivedStreamId > 2 {
  649. lastStreamId = s.receivedStreamId - 2
  650. }
  651. goAwayFrame := &spdy.GoAwayFrame{
  652. LastGoodStreamId: lastStreamId,
  653. Status: spdy.GoAwayOK,
  654. }
  655. err := s.framer.WriteFrame(goAwayFrame)
  656. if err != nil {
  657. return err
  658. }
  659. go s.shutdown(s.closeTimeout)
  660. return nil
  661. }
  662. // CloseWait closes the connection and waits for shutdown
  663. // to finish. Note the underlying network Connection
  664. // is not closed until the end of shutdown.
  665. func (s *Connection) CloseWait() error {
  666. closeErr := s.Close()
  667. if closeErr != nil {
  668. return closeErr
  669. }
  670. shutdownErr, ok := <-s.shutdownChan
  671. if ok {
  672. return shutdownErr
  673. }
  674. return nil
  675. }
  676. // Wait waits for the connection to finish shutdown or for
  677. // the wait timeout duration to expire. This needs to be
  678. // called either after Close has been called or the GOAWAYFRAME
  679. // has been received. If the wait timeout is 0, this function
  680. // will block until shutdown finishes. If wait is never called
  681. // and a shutdown error occurs, that error will be logged as an
  682. // unhandled error.
  683. func (s *Connection) Wait(waitTimeout time.Duration) error {
  684. var timeout <-chan time.Time
  685. if waitTimeout > time.Duration(0) {
  686. timeout = time.After(waitTimeout)
  687. }
  688. select {
  689. case err, ok := <-s.shutdownChan:
  690. if ok {
  691. return err
  692. }
  693. case <-timeout:
  694. return ErrTimeout
  695. }
  696. return nil
  697. }
  698. // NotifyClose registers a channel to be called when the remote
  699. // peer inidicates connection closure. The last stream to be
  700. // received by the remote will be sent on the channel. The notify
  701. // timeout will determine the duration between go away received
  702. // and the connection being closed.
  703. func (s *Connection) NotifyClose(c chan<- *Stream, timeout time.Duration) {
  704. s.goAwayTimeout = timeout
  705. s.lastStreamChan = c
  706. }
  707. // SetCloseTimeout sets the amount of time close will wait for
  708. // streams to finish before terminating the underlying network
  709. // connection. Setting the timeout to 0 will cause close to
  710. // wait forever, which is the default.
  711. func (s *Connection) SetCloseTimeout(timeout time.Duration) {
  712. s.closeTimeout = timeout
  713. }
  714. // SetIdleTimeout sets the amount of time the connection may sit idle before
  715. // it is forcefully terminated.
  716. func (s *Connection) SetIdleTimeout(timeout time.Duration) {
  717. s.framer.setIdleTimeout(timeout)
  718. }
  719. func (s *Connection) sendHeaders(headers http.Header, stream *Stream, fin bool) error {
  720. var flags spdy.ControlFlags
  721. if fin {
  722. flags = spdy.ControlFlagFin
  723. }
  724. headerFrame := &spdy.HeadersFrame{
  725. StreamId: stream.streamId,
  726. Headers: headers,
  727. CFHeader: spdy.ControlFrameHeader{Flags: flags},
  728. }
  729. return s.framer.WriteFrame(headerFrame)
  730. }
  731. func (s *Connection) sendReply(headers http.Header, stream *Stream, fin bool) error {
  732. var flags spdy.ControlFlags
  733. if fin {
  734. flags = spdy.ControlFlagFin
  735. }
  736. replyFrame := &spdy.SynReplyFrame{
  737. StreamId: stream.streamId,
  738. Headers: headers,
  739. CFHeader: spdy.ControlFrameHeader{Flags: flags},
  740. }
  741. return s.framer.WriteFrame(replyFrame)
  742. }
  743. func (s *Connection) sendResetFrame(status spdy.RstStreamStatus, streamId spdy.StreamId) error {
  744. resetFrame := &spdy.RstStreamFrame{
  745. StreamId: streamId,
  746. Status: status,
  747. }
  748. return s.framer.WriteFrame(resetFrame)
  749. }
  750. func (s *Connection) sendReset(status spdy.RstStreamStatus, stream *Stream) error {
  751. return s.sendResetFrame(status, stream.streamId)
  752. }
  753. func (s *Connection) sendStream(stream *Stream, fin bool) error {
  754. var flags spdy.ControlFlags
  755. if fin {
  756. flags = spdy.ControlFlagFin
  757. stream.finished = true
  758. }
  759. var parentId spdy.StreamId
  760. if stream.parent != nil {
  761. parentId = stream.parent.streamId
  762. }
  763. streamFrame := &spdy.SynStreamFrame{
  764. StreamId: spdy.StreamId(stream.streamId),
  765. AssociatedToStreamId: spdy.StreamId(parentId),
  766. Headers: stream.headers,
  767. CFHeader: spdy.ControlFrameHeader{Flags: flags},
  768. }
  769. return s.framer.WriteFrame(streamFrame)
  770. }
  771. // getNextStreamId returns the next sequential id
  772. // every call should produce a unique value or an error
  773. func (s *Connection) getNextStreamId() spdy.StreamId {
  774. sid := s.nextStreamId
  775. if sid > 0x7fffffff {
  776. return 0
  777. }
  778. s.nextStreamId = s.nextStreamId + 2
  779. return sid
  780. }
  781. // PeekNextStreamId returns the next sequential id and keeps the next id untouched
  782. func (s *Connection) PeekNextStreamId() spdy.StreamId {
  783. sid := s.nextStreamId
  784. return sid
  785. }
  786. func (s *Connection) validateStreamId(rid spdy.StreamId) error {
  787. if rid > 0x7fffffff || rid < s.receivedStreamId {
  788. return ErrInvalidStreamId
  789. }
  790. s.receivedStreamId = rid + 2
  791. return nil
  792. }
  793. func (s *Connection) addStream(stream *Stream) {
  794. s.streamCond.L.Lock()
  795. s.streams[stream.streamId] = stream
  796. debugMessage("(%p) (%p) Stream added, broadcasting: %d", s, stream, stream.streamId)
  797. s.streamCond.Broadcast()
  798. s.streamCond.L.Unlock()
  799. }
  800. func (s *Connection) removeStream(stream *Stream) {
  801. s.streamCond.L.Lock()
  802. delete(s.streams, stream.streamId)
  803. debugMessage("(%p) (%p) Stream removed, broadcasting: %d", s, stream, stream.streamId)
  804. s.streamCond.Broadcast()
  805. s.streamCond.L.Unlock()
  806. }
  807. func (s *Connection) getStream(streamId spdy.StreamId) (stream *Stream, ok bool) {
  808. s.streamLock.RLock()
  809. stream, ok = s.streams[streamId]
  810. s.streamLock.RUnlock()
  811. return
  812. }
  813. // FindStream looks up the given stream id and either waits for the
  814. // stream to be found or returns nil if the stream id is no longer
  815. // valid.
  816. func (s *Connection) FindStream(streamId uint32) *Stream {
  817. var stream *Stream
  818. var ok bool
  819. s.streamCond.L.Lock()
  820. stream, ok = s.streams[spdy.StreamId(streamId)]
  821. debugMessage("(%p) Found stream %d? %t", s, spdy.StreamId(streamId), ok)
  822. for !ok && streamId >= uint32(s.receivedStreamId) {
  823. s.streamCond.Wait()
  824. stream, ok = s.streams[spdy.StreamId(streamId)]
  825. }
  826. s.streamCond.L.Unlock()
  827. return stream
  828. }
  829. func (s *Connection) CloseChan() <-chan bool {
  830. return s.closeChan
  831. }