http2_server.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "strconv"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "github.com/golang/protobuf/proto"
  31. "golang.org/x/net/context"
  32. "golang.org/x/net/http2"
  33. "golang.org/x/net/http2/hpack"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/grpclog"
  37. "google.golang.org/grpc/internal/channelz"
  38. "google.golang.org/grpc/internal/grpcrand"
  39. "google.golang.org/grpc/keepalive"
  40. "google.golang.org/grpc/metadata"
  41. "google.golang.org/grpc/peer"
  42. "google.golang.org/grpc/stats"
  43. "google.golang.org/grpc/status"
  44. "google.golang.org/grpc/tap"
  45. )
  46. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  47. // the stream's state.
  48. var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
  49. // http2Server implements the ServerTransport interface with HTTP2.
  50. type http2Server struct {
  51. ctx context.Context
  52. ctxDone <-chan struct{} // Cache the context.Done() chan
  53. cancel context.CancelFunc
  54. conn net.Conn
  55. loopy *loopyWriter
  56. readerDone chan struct{} // sync point to enable testing.
  57. writerDone chan struct{} // sync point to enable testing.
  58. remoteAddr net.Addr
  59. localAddr net.Addr
  60. maxStreamID uint32 // max stream ID ever seen
  61. authInfo credentials.AuthInfo // auth info about the connection
  62. inTapHandle tap.ServerInHandle
  63. framer *framer
  64. // The max number of concurrent streams.
  65. maxStreams uint32
  66. // controlBuf delivers all the control related tasks (e.g., window
  67. // updates, reset streams, and various settings) to the controller.
  68. controlBuf *controlBuffer
  69. fc *trInFlow
  70. stats stats.Handler
  71. // Flag to keep track of reading activity on transport.
  72. // 1 is true and 0 is false.
  73. activity uint32 // Accessed atomically.
  74. // Keepalive and max-age parameters for the server.
  75. kp keepalive.ServerParameters
  76. // Keepalive enforcement policy.
  77. kep keepalive.EnforcementPolicy
  78. // The time instance last ping was received.
  79. lastPingAt time.Time
  80. // Number of times the client has violated keepalive ping policy so far.
  81. pingStrikes uint8
  82. // Flag to signify that number of ping strikes should be reset to 0.
  83. // This is set whenever data or header frames are sent.
  84. // 1 means yes.
  85. resetPingStrikes uint32 // Accessed atomically.
  86. initialWindowSize int32
  87. bdpEst *bdpEstimator
  88. mu sync.Mutex // guard the following
  89. // drainChan is initialized when drain(...) is called the first time.
  90. // After which the server writes out the first GoAway(with ID 2^31-1) frame.
  91. // Then an independent goroutine will be launched to later send the second GoAway.
  92. // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
  93. // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
  94. // already underway.
  95. drainChan chan struct{}
  96. state transportState
  97. activeStreams map[uint32]*Stream
  98. // idle is the time instant when the connection went idle.
  99. // This is either the beginning of the connection or when the number of
  100. // RPCs go down to 0.
  101. // When the connection is busy, this value is set to 0.
  102. idle time.Time
  103. // Fields below are for channelz metric collection.
  104. channelzID int64 // channelz unique identification number
  105. czmu sync.RWMutex
  106. kpCount int64
  107. // The number of streams that have started, including already finished ones.
  108. streamsStarted int64
  109. // The number of streams that have ended successfully by sending frame with
  110. // EoS bit set.
  111. streamsSucceeded int64
  112. streamsFailed int64
  113. lastStreamCreated time.Time
  114. msgSent int64
  115. msgRecv int64
  116. lastMsgSent time.Time
  117. lastMsgRecv time.Time
  118. }
  119. // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
  120. // returned if something goes wrong.
  121. func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  122. writeBufSize := defaultWriteBufSize
  123. if config.WriteBufferSize > 0 {
  124. writeBufSize = config.WriteBufferSize
  125. }
  126. readBufSize := defaultReadBufSize
  127. if config.ReadBufferSize > 0 {
  128. readBufSize = config.ReadBufferSize
  129. }
  130. framer := newFramer(conn, writeBufSize, readBufSize)
  131. // Send initial settings as connection preface to client.
  132. var isettings []http2.Setting
  133. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
  134. // permitted in the HTTP2 spec.
  135. maxStreams := config.MaxStreams
  136. if maxStreams == 0 {
  137. maxStreams = math.MaxUint32
  138. } else {
  139. isettings = append(isettings, http2.Setting{
  140. ID: http2.SettingMaxConcurrentStreams,
  141. Val: maxStreams,
  142. })
  143. }
  144. dynamicWindow := true
  145. iwz := int32(initialWindowSize)
  146. if config.InitialWindowSize >= defaultWindowSize {
  147. iwz = config.InitialWindowSize
  148. dynamicWindow = false
  149. }
  150. icwz := int32(initialWindowSize)
  151. if config.InitialConnWindowSize >= defaultWindowSize {
  152. icwz = config.InitialConnWindowSize
  153. dynamicWindow = false
  154. }
  155. if iwz != defaultWindowSize {
  156. isettings = append(isettings, http2.Setting{
  157. ID: http2.SettingInitialWindowSize,
  158. Val: uint32(iwz)})
  159. }
  160. if err := framer.fr.WriteSettings(isettings...); err != nil {
  161. return nil, connectionErrorf(false, err, "transport: %v", err)
  162. }
  163. // Adjust the connection flow control window if needed.
  164. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  165. if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
  166. return nil, connectionErrorf(false, err, "transport: %v", err)
  167. }
  168. }
  169. kp := config.KeepaliveParams
  170. if kp.MaxConnectionIdle == 0 {
  171. kp.MaxConnectionIdle = defaultMaxConnectionIdle
  172. }
  173. if kp.MaxConnectionAge == 0 {
  174. kp.MaxConnectionAge = defaultMaxConnectionAge
  175. }
  176. // Add a jitter to MaxConnectionAge.
  177. kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
  178. if kp.MaxConnectionAgeGrace == 0 {
  179. kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
  180. }
  181. if kp.Time == 0 {
  182. kp.Time = defaultServerKeepaliveTime
  183. }
  184. if kp.Timeout == 0 {
  185. kp.Timeout = defaultServerKeepaliveTimeout
  186. }
  187. kep := config.KeepalivePolicy
  188. if kep.MinTime == 0 {
  189. kep.MinTime = defaultKeepalivePolicyMinTime
  190. }
  191. ctx, cancel := context.WithCancel(context.Background())
  192. t := &http2Server{
  193. ctx: ctx,
  194. cancel: cancel,
  195. ctxDone: ctx.Done(),
  196. conn: conn,
  197. remoteAddr: conn.RemoteAddr(),
  198. localAddr: conn.LocalAddr(),
  199. authInfo: config.AuthInfo,
  200. framer: framer,
  201. readerDone: make(chan struct{}),
  202. writerDone: make(chan struct{}),
  203. maxStreams: maxStreams,
  204. inTapHandle: config.InTapHandle,
  205. fc: &trInFlow{limit: uint32(icwz)},
  206. state: reachable,
  207. activeStreams: make(map[uint32]*Stream),
  208. stats: config.StatsHandler,
  209. kp: kp,
  210. idle: time.Now(),
  211. kep: kep,
  212. initialWindowSize: iwz,
  213. }
  214. t.controlBuf = newControlBuffer(t.ctxDone)
  215. if dynamicWindow {
  216. t.bdpEst = &bdpEstimator{
  217. bdp: initialWindowSize,
  218. updateFlowControl: t.updateFlowControl,
  219. }
  220. }
  221. if t.stats != nil {
  222. t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
  223. RemoteAddr: t.remoteAddr,
  224. LocalAddr: t.localAddr,
  225. })
  226. connBegin := &stats.ConnBegin{}
  227. t.stats.HandleConn(t.ctx, connBegin)
  228. }
  229. if channelz.IsOn() {
  230. t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, "")
  231. }
  232. t.framer.writer.Flush()
  233. defer func() {
  234. if err != nil {
  235. t.Close()
  236. }
  237. }()
  238. // Check the validity of client preface.
  239. preface := make([]byte, len(clientPreface))
  240. if _, err := io.ReadFull(t.conn, preface); err != nil {
  241. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  242. }
  243. if !bytes.Equal(preface, clientPreface) {
  244. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  245. }
  246. frame, err := t.framer.fr.ReadFrame()
  247. if err == io.EOF || err == io.ErrUnexpectedEOF {
  248. return nil, err
  249. }
  250. if err != nil {
  251. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
  252. }
  253. atomic.StoreUint32(&t.activity, 1)
  254. sf, ok := frame.(*http2.SettingsFrame)
  255. if !ok {
  256. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  257. }
  258. t.handleSettings(sf)
  259. go func() {
  260. t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
  261. t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
  262. if err := t.loopy.run(); err != nil {
  263. errorf("transport: loopyWriter.run returning. Err: %v", err)
  264. }
  265. t.conn.Close()
  266. close(t.writerDone)
  267. }()
  268. go t.keepalive()
  269. return t, nil
  270. }
  271. // operateHeader takes action on the decoded headers.
  272. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
  273. streamID := frame.Header().StreamID
  274. var state decodeState
  275. for _, hf := range frame.Fields {
  276. if err := state.processHeaderField(hf); err != nil {
  277. if se, ok := err.(StreamError); ok {
  278. t.controlBuf.put(&cleanupStream{
  279. streamID: streamID,
  280. rst: true,
  281. rstCode: statusCodeConvTab[se.Code],
  282. onWrite: func() {},
  283. })
  284. }
  285. return
  286. }
  287. }
  288. buf := newRecvBuffer()
  289. s := &Stream{
  290. id: streamID,
  291. st: t,
  292. buf: buf,
  293. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  294. recvCompress: state.encoding,
  295. method: state.method,
  296. contentSubtype: state.contentSubtype,
  297. }
  298. if frame.StreamEnded() {
  299. // s is just created by the caller. No lock needed.
  300. s.state = streamReadDone
  301. }
  302. if state.timeoutSet {
  303. s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
  304. } else {
  305. s.ctx, s.cancel = context.WithCancel(t.ctx)
  306. }
  307. pr := &peer.Peer{
  308. Addr: t.remoteAddr,
  309. }
  310. // Attach Auth info if there is any.
  311. if t.authInfo != nil {
  312. pr.AuthInfo = t.authInfo
  313. }
  314. s.ctx = peer.NewContext(s.ctx, pr)
  315. // Attach the received metadata to the context.
  316. if len(state.mdata) > 0 {
  317. s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
  318. }
  319. if state.statsTags != nil {
  320. s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
  321. }
  322. if state.statsTrace != nil {
  323. s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
  324. }
  325. if t.inTapHandle != nil {
  326. var err error
  327. info := &tap.Info{
  328. FullMethodName: state.method,
  329. }
  330. s.ctx, err = t.inTapHandle(s.ctx, info)
  331. if err != nil {
  332. warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
  333. t.controlBuf.put(&cleanupStream{
  334. streamID: s.id,
  335. rst: true,
  336. rstCode: http2.ErrCodeRefusedStream,
  337. onWrite: func() {},
  338. })
  339. return
  340. }
  341. }
  342. t.mu.Lock()
  343. if t.state != reachable {
  344. t.mu.Unlock()
  345. return
  346. }
  347. if uint32(len(t.activeStreams)) >= t.maxStreams {
  348. t.mu.Unlock()
  349. t.controlBuf.put(&cleanupStream{
  350. streamID: streamID,
  351. rst: true,
  352. rstCode: http2.ErrCodeRefusedStream,
  353. onWrite: func() {},
  354. })
  355. return
  356. }
  357. if streamID%2 != 1 || streamID <= t.maxStreamID {
  358. t.mu.Unlock()
  359. // illegal gRPC stream id.
  360. errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
  361. return true
  362. }
  363. t.maxStreamID = streamID
  364. t.activeStreams[streamID] = s
  365. if len(t.activeStreams) == 1 {
  366. t.idle = time.Time{}
  367. }
  368. t.mu.Unlock()
  369. if channelz.IsOn() {
  370. t.czmu.Lock()
  371. t.streamsStarted++
  372. t.lastStreamCreated = time.Now()
  373. t.czmu.Unlock()
  374. }
  375. s.requestRead = func(n int) {
  376. t.adjustWindow(s, uint32(n))
  377. }
  378. s.ctx = traceCtx(s.ctx, s.method)
  379. if t.stats != nil {
  380. s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
  381. inHeader := &stats.InHeader{
  382. FullMethod: s.method,
  383. RemoteAddr: t.remoteAddr,
  384. LocalAddr: t.localAddr,
  385. Compression: s.recvCompress,
  386. WireLength: int(frame.Header().Length),
  387. }
  388. t.stats.HandleRPC(s.ctx, inHeader)
  389. }
  390. s.ctxDone = s.ctx.Done()
  391. s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
  392. s.trReader = &transportReader{
  393. reader: &recvBufferReader{
  394. ctx: s.ctx,
  395. ctxDone: s.ctxDone,
  396. recv: s.buf,
  397. },
  398. windowHandler: func(n int) {
  399. t.updateWindow(s, uint32(n))
  400. },
  401. }
  402. // Register the stream with loopy.
  403. t.controlBuf.put(&registerStream{
  404. streamID: s.id,
  405. wq: s.wq,
  406. })
  407. handle(s)
  408. return
  409. }
  410. // HandleStreams receives incoming streams using the given handler. This is
  411. // typically run in a separate goroutine.
  412. // traceCtx attaches trace to ctx and returns the new context.
  413. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  414. defer close(t.readerDone)
  415. for {
  416. frame, err := t.framer.fr.ReadFrame()
  417. atomic.StoreUint32(&t.activity, 1)
  418. if err != nil {
  419. if se, ok := err.(http2.StreamError); ok {
  420. warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
  421. t.mu.Lock()
  422. s := t.activeStreams[se.StreamID]
  423. t.mu.Unlock()
  424. if s != nil {
  425. t.closeStream(s, true, se.Code, nil, false)
  426. } else {
  427. t.controlBuf.put(&cleanupStream{
  428. streamID: se.StreamID,
  429. rst: true,
  430. rstCode: se.Code,
  431. onWrite: func() {},
  432. })
  433. }
  434. continue
  435. }
  436. if err == io.EOF || err == io.ErrUnexpectedEOF {
  437. t.Close()
  438. return
  439. }
  440. warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  441. t.Close()
  442. return
  443. }
  444. switch frame := frame.(type) {
  445. case *http2.MetaHeadersFrame:
  446. if t.operateHeaders(frame, handle, traceCtx) {
  447. t.Close()
  448. break
  449. }
  450. case *http2.DataFrame:
  451. t.handleData(frame)
  452. case *http2.RSTStreamFrame:
  453. t.handleRSTStream(frame)
  454. case *http2.SettingsFrame:
  455. t.handleSettings(frame)
  456. case *http2.PingFrame:
  457. t.handlePing(frame)
  458. case *http2.WindowUpdateFrame:
  459. t.handleWindowUpdate(frame)
  460. case *http2.GoAwayFrame:
  461. // TODO: Handle GoAway from the client appropriately.
  462. default:
  463. errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  464. }
  465. }
  466. }
  467. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  468. t.mu.Lock()
  469. defer t.mu.Unlock()
  470. if t.activeStreams == nil {
  471. // The transport is closing.
  472. return nil, false
  473. }
  474. s, ok := t.activeStreams[f.Header().StreamID]
  475. if !ok {
  476. // The stream is already done.
  477. return nil, false
  478. }
  479. return s, true
  480. }
  481. // adjustWindow sends out extra window update over the initial window size
  482. // of stream if the application is requesting data larger in size than
  483. // the window.
  484. func (t *http2Server) adjustWindow(s *Stream, n uint32) {
  485. if w := s.fc.maybeAdjust(n); w > 0 {
  486. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  487. }
  488. }
  489. // updateWindow adjusts the inbound quota for the stream and the transport.
  490. // Window updates will deliver to the controller for sending when
  491. // the cumulative quota exceeds the corresponding threshold.
  492. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  493. if w := s.fc.onRead(n); w > 0 {
  494. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
  495. increment: w,
  496. })
  497. }
  498. }
  499. // updateFlowControl updates the incoming flow control windows
  500. // for the transport and the stream based on the current bdp
  501. // estimation.
  502. func (t *http2Server) updateFlowControl(n uint32) {
  503. t.mu.Lock()
  504. for _, s := range t.activeStreams {
  505. s.fc.newLimit(n)
  506. }
  507. t.initialWindowSize = int32(n)
  508. t.mu.Unlock()
  509. t.controlBuf.put(&outgoingWindowUpdate{
  510. streamID: 0,
  511. increment: t.fc.newLimit(n),
  512. })
  513. t.controlBuf.put(&outgoingSettings{
  514. ss: []http2.Setting{
  515. {
  516. ID: http2.SettingInitialWindowSize,
  517. Val: n,
  518. },
  519. },
  520. })
  521. }
  522. func (t *http2Server) handleData(f *http2.DataFrame) {
  523. size := f.Header().Length
  524. var sendBDPPing bool
  525. if t.bdpEst != nil {
  526. sendBDPPing = t.bdpEst.add(size)
  527. }
  528. // Decouple connection's flow control from application's read.
  529. // An update on connection's flow control should not depend on
  530. // whether user application has read the data or not. Such a
  531. // restriction is already imposed on the stream's flow control,
  532. // and therefore the sender will be blocked anyways.
  533. // Decoupling the connection flow control will prevent other
  534. // active(fast) streams from starving in presence of slow or
  535. // inactive streams.
  536. if w := t.fc.onData(size); w > 0 {
  537. t.controlBuf.put(&outgoingWindowUpdate{
  538. streamID: 0,
  539. increment: w,
  540. })
  541. }
  542. if sendBDPPing {
  543. // Avoid excessive ping detection (e.g. in an L7 proxy)
  544. // by sending a window update prior to the BDP ping.
  545. if w := t.fc.reset(); w > 0 {
  546. t.controlBuf.put(&outgoingWindowUpdate{
  547. streamID: 0,
  548. increment: w,
  549. })
  550. }
  551. t.controlBuf.put(bdpPing)
  552. }
  553. // Select the right stream to dispatch.
  554. s, ok := t.getStream(f)
  555. if !ok {
  556. return
  557. }
  558. if size > 0 {
  559. if err := s.fc.onData(size); err != nil {
  560. t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
  561. return
  562. }
  563. if f.Header().Flags.Has(http2.FlagDataPadded) {
  564. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  565. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  566. }
  567. }
  568. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  569. // guarantee f.Data() is consumed before the arrival of next frame.
  570. // Can this copy be eliminated?
  571. if len(f.Data()) > 0 {
  572. data := make([]byte, len(f.Data()))
  573. copy(data, f.Data())
  574. s.write(recvMsg{data: data})
  575. }
  576. }
  577. if f.Header().Flags.Has(http2.FlagDataEndStream) {
  578. // Received the end of stream from the client.
  579. s.compareAndSwapState(streamActive, streamReadDone)
  580. s.write(recvMsg{err: io.EOF})
  581. }
  582. }
  583. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  584. s, ok := t.getStream(f)
  585. if !ok {
  586. return
  587. }
  588. t.closeStream(s, false, 0, nil, false)
  589. }
  590. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  591. if f.IsAck() {
  592. return
  593. }
  594. var ss []http2.Setting
  595. f.ForeachSetting(func(s http2.Setting) error {
  596. ss = append(ss, s)
  597. return nil
  598. })
  599. t.controlBuf.put(&incomingSettings{
  600. ss: ss,
  601. })
  602. }
  603. const (
  604. maxPingStrikes = 2
  605. defaultPingTimeout = 2 * time.Hour
  606. )
  607. func (t *http2Server) handlePing(f *http2.PingFrame) {
  608. if f.IsAck() {
  609. if f.Data == goAwayPing.data && t.drainChan != nil {
  610. close(t.drainChan)
  611. return
  612. }
  613. // Maybe it's a BDP ping.
  614. if t.bdpEst != nil {
  615. t.bdpEst.calculate(f.Data)
  616. }
  617. return
  618. }
  619. pingAck := &ping{ack: true}
  620. copy(pingAck.data[:], f.Data[:])
  621. t.controlBuf.put(pingAck)
  622. now := time.Now()
  623. defer func() {
  624. t.lastPingAt = now
  625. }()
  626. // A reset ping strikes means that we don't need to check for policy
  627. // violation for this ping and the pingStrikes counter should be set
  628. // to 0.
  629. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  630. t.pingStrikes = 0
  631. return
  632. }
  633. t.mu.Lock()
  634. ns := len(t.activeStreams)
  635. t.mu.Unlock()
  636. if ns < 1 && !t.kep.PermitWithoutStream {
  637. // Keepalive shouldn't be active thus, this new ping should
  638. // have come after at least defaultPingTimeout.
  639. if t.lastPingAt.Add(defaultPingTimeout).After(now) {
  640. t.pingStrikes++
  641. }
  642. } else {
  643. // Check if keepalive policy is respected.
  644. if t.lastPingAt.Add(t.kep.MinTime).After(now) {
  645. t.pingStrikes++
  646. }
  647. }
  648. if t.pingStrikes > maxPingStrikes {
  649. // Send goaway and close the connection.
  650. errorf("transport: Got too many pings from the client, closing the connection.")
  651. t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
  652. }
  653. }
  654. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  655. t.controlBuf.put(&incomingWindowUpdate{
  656. streamID: f.Header().StreamID,
  657. increment: f.Increment,
  658. })
  659. }
  660. func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
  661. for k, vv := range md {
  662. if isReservedHeader(k) {
  663. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  664. continue
  665. }
  666. for _, v := range vv {
  667. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  668. }
  669. }
  670. return headerFields
  671. }
  672. // WriteHeader sends the header metedata md back to the client.
  673. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  674. if s.updateHeaderSent() || s.getState() == streamDone {
  675. return ErrIllegalHeaderWrite
  676. }
  677. s.hdrMu.Lock()
  678. if md.Len() > 0 {
  679. if s.header.Len() > 0 {
  680. s.header = metadata.Join(s.header, md)
  681. } else {
  682. s.header = md
  683. }
  684. }
  685. t.writeHeaderLocked(s)
  686. s.hdrMu.Unlock()
  687. return nil
  688. }
  689. func (t *http2Server) writeHeaderLocked(s *Stream) {
  690. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  691. // first and create a slice of that exact size.
  692. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  693. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  694. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  695. if s.sendCompress != "" {
  696. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  697. }
  698. headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
  699. t.controlBuf.put(&headerFrame{
  700. streamID: s.id,
  701. hf: headerFields,
  702. endStream: false,
  703. onWrite: func() {
  704. atomic.StoreUint32(&t.resetPingStrikes, 1)
  705. },
  706. })
  707. if t.stats != nil {
  708. // Note: WireLength is not set in outHeader.
  709. // TODO(mmukhi): Revisit this later, if needed.
  710. outHeader := &stats.OutHeader{}
  711. t.stats.HandleRPC(s.Context(), outHeader)
  712. }
  713. }
  714. // WriteStatus sends stream status to the client and terminates the stream.
  715. // There is no further I/O operations being able to perform on this stream.
  716. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  717. // OK is adopted.
  718. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  719. if s.getState() == streamDone {
  720. return nil
  721. }
  722. s.hdrMu.Lock()
  723. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  724. // first and create a slice of that exact size.
  725. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  726. if !s.updateHeaderSent() { // No headers have been sent.
  727. if len(s.header) > 0 { // Send a separate header frame.
  728. t.writeHeaderLocked(s)
  729. } else { // Send a trailer only response.
  730. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  731. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  732. }
  733. }
  734. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  735. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  736. if p := st.Proto(); p != nil && len(p.Details) > 0 {
  737. stBytes, err := proto.Marshal(p)
  738. if err != nil {
  739. // TODO: return error instead, when callers are able to handle it.
  740. grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
  741. } else {
  742. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
  743. }
  744. }
  745. // Attach the trailer metadata.
  746. headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
  747. trailingHeader := &headerFrame{
  748. streamID: s.id,
  749. hf: headerFields,
  750. endStream: true,
  751. onWrite: func() {
  752. atomic.StoreUint32(&t.resetPingStrikes, 1)
  753. },
  754. }
  755. s.hdrMu.Unlock()
  756. t.closeStream(s, false, 0, trailingHeader, true)
  757. if t.stats != nil {
  758. t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
  759. }
  760. return nil
  761. }
  762. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  763. // is returns if it fails (e.g., framing error, transport error).
  764. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  765. if !s.isHeaderSent() { // Headers haven't been written yet.
  766. if err := t.WriteHeader(s, nil); err != nil {
  767. // TODO(mmukhi, dfawley): Make sure this is the right code to return.
  768. return streamErrorf(codes.Internal, "transport: %v", err)
  769. }
  770. } else {
  771. // Writing headers checks for this condition.
  772. if s.getState() == streamDone {
  773. // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
  774. s.cancel()
  775. select {
  776. case <-t.ctx.Done():
  777. return ErrConnClosing
  778. default:
  779. }
  780. return ContextErr(s.ctx.Err())
  781. }
  782. }
  783. // Add some data to header frame so that we can equally distribute bytes across frames.
  784. emptyLen := http2MaxFrameLen - len(hdr)
  785. if emptyLen > len(data) {
  786. emptyLen = len(data)
  787. }
  788. hdr = append(hdr, data[:emptyLen]...)
  789. data = data[emptyLen:]
  790. df := &dataFrame{
  791. streamID: s.id,
  792. h: hdr,
  793. d: data,
  794. onEachWrite: func() {
  795. atomic.StoreUint32(&t.resetPingStrikes, 1)
  796. },
  797. }
  798. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  799. select {
  800. case <-t.ctx.Done():
  801. return ErrConnClosing
  802. default:
  803. }
  804. return ContextErr(s.ctx.Err())
  805. }
  806. return t.controlBuf.put(df)
  807. }
  808. // keepalive running in a separate goroutine does the following:
  809. // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  810. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  811. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  812. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  813. // after an additional duration of keepalive.Timeout.
  814. func (t *http2Server) keepalive() {
  815. p := &ping{}
  816. var pingSent bool
  817. maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
  818. maxAge := time.NewTimer(t.kp.MaxConnectionAge)
  819. keepalive := time.NewTimer(t.kp.Time)
  820. // NOTE: All exit paths of this function should reset their
  821. // respective timers. A failure to do so will cause the
  822. // following clean-up to deadlock and eventually leak.
  823. defer func() {
  824. if !maxIdle.Stop() {
  825. <-maxIdle.C
  826. }
  827. if !maxAge.Stop() {
  828. <-maxAge.C
  829. }
  830. if !keepalive.Stop() {
  831. <-keepalive.C
  832. }
  833. }()
  834. for {
  835. select {
  836. case <-maxIdle.C:
  837. t.mu.Lock()
  838. idle := t.idle
  839. if idle.IsZero() { // The connection is non-idle.
  840. t.mu.Unlock()
  841. maxIdle.Reset(t.kp.MaxConnectionIdle)
  842. continue
  843. }
  844. val := t.kp.MaxConnectionIdle - time.Since(idle)
  845. t.mu.Unlock()
  846. if val <= 0 {
  847. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  848. // Gracefully close the connection.
  849. t.drain(http2.ErrCodeNo, []byte{})
  850. // Resetting the timer so that the clean-up doesn't deadlock.
  851. maxIdle.Reset(infinity)
  852. return
  853. }
  854. maxIdle.Reset(val)
  855. case <-maxAge.C:
  856. t.drain(http2.ErrCodeNo, []byte{})
  857. maxAge.Reset(t.kp.MaxConnectionAgeGrace)
  858. select {
  859. case <-maxAge.C:
  860. // Close the connection after grace period.
  861. t.Close()
  862. // Resetting the timer so that the clean-up doesn't deadlock.
  863. maxAge.Reset(infinity)
  864. case <-t.ctx.Done():
  865. }
  866. return
  867. case <-keepalive.C:
  868. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  869. pingSent = false
  870. keepalive.Reset(t.kp.Time)
  871. continue
  872. }
  873. if pingSent {
  874. t.Close()
  875. // Resetting the timer so that the clean-up doesn't deadlock.
  876. keepalive.Reset(infinity)
  877. return
  878. }
  879. pingSent = true
  880. if channelz.IsOn() {
  881. t.czmu.Lock()
  882. t.kpCount++
  883. t.czmu.Unlock()
  884. }
  885. t.controlBuf.put(p)
  886. keepalive.Reset(t.kp.Timeout)
  887. case <-t.ctx.Done():
  888. return
  889. }
  890. }
  891. }
  892. // Close starts shutting down the http2Server transport.
  893. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  894. // could cause some resource issue. Revisit this later.
  895. func (t *http2Server) Close() error {
  896. t.mu.Lock()
  897. if t.state == closing {
  898. t.mu.Unlock()
  899. return errors.New("transport: Close() was already called")
  900. }
  901. t.state = closing
  902. streams := t.activeStreams
  903. t.activeStreams = nil
  904. t.mu.Unlock()
  905. t.controlBuf.finish()
  906. t.cancel()
  907. err := t.conn.Close()
  908. if channelz.IsOn() {
  909. channelz.RemoveEntry(t.channelzID)
  910. }
  911. // Cancel all active streams.
  912. for _, s := range streams {
  913. s.cancel()
  914. }
  915. if t.stats != nil {
  916. connEnd := &stats.ConnEnd{}
  917. t.stats.HandleConn(t.ctx, connEnd)
  918. }
  919. return err
  920. }
  921. // closeStream clears the footprint of a stream when the stream is not needed
  922. // any more.
  923. func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
  924. if s.swapState(streamDone) == streamDone {
  925. // If the stream was already done, return.
  926. return
  927. }
  928. // In case stream sending and receiving are invoked in separate
  929. // goroutines (e.g., bi-directional streaming), cancel needs to be
  930. // called to interrupt the potential blocking on other goroutines.
  931. s.cancel()
  932. cleanup := &cleanupStream{
  933. streamID: s.id,
  934. rst: rst,
  935. rstCode: rstCode,
  936. onWrite: func() {
  937. t.mu.Lock()
  938. if t.activeStreams != nil {
  939. delete(t.activeStreams, s.id)
  940. if len(t.activeStreams) == 0 {
  941. t.idle = time.Now()
  942. }
  943. }
  944. t.mu.Unlock()
  945. if channelz.IsOn() {
  946. t.czmu.Lock()
  947. if eosReceived {
  948. t.streamsSucceeded++
  949. } else {
  950. t.streamsFailed++
  951. }
  952. t.czmu.Unlock()
  953. }
  954. },
  955. }
  956. if hdr != nil {
  957. hdr.cleanup = cleanup
  958. t.controlBuf.put(hdr)
  959. } else {
  960. t.controlBuf.put(cleanup)
  961. }
  962. }
  963. func (t *http2Server) RemoteAddr() net.Addr {
  964. return t.remoteAddr
  965. }
  966. func (t *http2Server) Drain() {
  967. t.drain(http2.ErrCodeNo, []byte{})
  968. }
  969. func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
  970. t.mu.Lock()
  971. defer t.mu.Unlock()
  972. if t.drainChan != nil {
  973. return
  974. }
  975. t.drainChan = make(chan struct{})
  976. t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
  977. }
  978. var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  979. // Handles outgoing GoAway and returns true if loopy needs to put itself
  980. // in draining mode.
  981. func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
  982. t.mu.Lock()
  983. if t.state == closing { // TODO(mmukhi): This seems unnecessary.
  984. t.mu.Unlock()
  985. // The transport is closing.
  986. return false, ErrConnClosing
  987. }
  988. sid := t.maxStreamID
  989. if !g.headsUp {
  990. // Stop accepting more streams now.
  991. t.state = draining
  992. if len(t.activeStreams) == 0 {
  993. g.closeConn = true
  994. }
  995. t.mu.Unlock()
  996. if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
  997. return false, err
  998. }
  999. if g.closeConn {
  1000. // Abruptly close the connection following the GoAway (via
  1001. // loopywriter). But flush out what's inside the buffer first.
  1002. t.framer.writer.Flush()
  1003. return false, fmt.Errorf("transport: Connection closing")
  1004. }
  1005. return true, nil
  1006. }
  1007. t.mu.Unlock()
  1008. // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1009. // Follow that with a ping and wait for the ack to come back or a timer
  1010. // to expire. During this time accept new streams since they might have
  1011. // originated before the GoAway reaches the client.
  1012. // After getting the ack or timer expiration send out another GoAway this
  1013. // time with an ID of the max stream server intends to process.
  1014. if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
  1015. return false, err
  1016. }
  1017. if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1018. return false, err
  1019. }
  1020. go func() {
  1021. timer := time.NewTimer(time.Minute)
  1022. defer timer.Stop()
  1023. select {
  1024. case <-t.drainChan:
  1025. case <-timer.C:
  1026. case <-t.ctx.Done():
  1027. return
  1028. }
  1029. t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
  1030. }()
  1031. return false, nil
  1032. }
  1033. func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
  1034. t.czmu.RLock()
  1035. s := channelz.SocketInternalMetric{
  1036. StreamsStarted: t.streamsStarted,
  1037. StreamsSucceeded: t.streamsSucceeded,
  1038. StreamsFailed: t.streamsFailed,
  1039. MessagesSent: t.msgSent,
  1040. MessagesReceived: t.msgRecv,
  1041. KeepAlivesSent: t.kpCount,
  1042. LastRemoteStreamCreatedTimestamp: t.lastStreamCreated,
  1043. LastMessageSentTimestamp: t.lastMsgSent,
  1044. LastMessageReceivedTimestamp: t.lastMsgRecv,
  1045. LocalFlowControlWindow: int64(t.fc.getSize()),
  1046. //socket options
  1047. LocalAddr: t.localAddr,
  1048. RemoteAddr: t.remoteAddr,
  1049. // Security
  1050. // RemoteName :
  1051. }
  1052. t.czmu.RUnlock()
  1053. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1054. return &s
  1055. }
  1056. func (t *http2Server) IncrMsgSent() {
  1057. t.czmu.Lock()
  1058. t.msgSent++
  1059. t.lastMsgSent = time.Now()
  1060. t.czmu.Unlock()
  1061. }
  1062. func (t *http2Server) IncrMsgRecv() {
  1063. t.czmu.Lock()
  1064. t.msgRecv++
  1065. t.lastMsgRecv = time.Now()
  1066. t.czmu.Unlock()
  1067. }
  1068. func (t *http2Server) getOutFlowWindow() int64 {
  1069. resp := make(chan uint32)
  1070. timer := time.NewTimer(time.Second)
  1071. defer timer.Stop()
  1072. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1073. select {
  1074. case sz := <-resp:
  1075. return int64(sz)
  1076. case <-t.ctxDone:
  1077. return -1
  1078. case <-timer.C:
  1079. return -2
  1080. }
  1081. }
  1082. func getJitter(v time.Duration) time.Duration {
  1083. if v == infinity {
  1084. return 0
  1085. }
  1086. // Generate a jitter between +/- 10% of the value.
  1087. r := int64(v / 10)
  1088. j := grpcrand.Int63n(2*r) - r
  1089. return time.Duration(j)
  1090. }