http2_client.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "io"
  21. "math"
  22. "net"
  23. "strings"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. "golang.org/x/net/context"
  28. "golang.org/x/net/http2"
  29. "golang.org/x/net/http2/hpack"
  30. "google.golang.org/grpc/codes"
  31. "google.golang.org/grpc/credentials"
  32. "google.golang.org/grpc/internal/channelz"
  33. "google.golang.org/grpc/keepalive"
  34. "google.golang.org/grpc/metadata"
  35. "google.golang.org/grpc/peer"
  36. "google.golang.org/grpc/stats"
  37. "google.golang.org/grpc/status"
  38. )
  39. // http2Client implements the ClientTransport interface with HTTP2.
  40. type http2Client struct {
  41. ctx context.Context
  42. cancel context.CancelFunc
  43. ctxDone <-chan struct{} // Cache the ctx.Done() chan.
  44. userAgent string
  45. md interface{}
  46. conn net.Conn // underlying communication channel
  47. loopy *loopyWriter
  48. remoteAddr net.Addr
  49. localAddr net.Addr
  50. authInfo credentials.AuthInfo // auth info about the connection
  51. readerDone chan struct{} // sync point to enable testing.
  52. writerDone chan struct{} // sync point to enable testing.
  53. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  54. // that the server sent GoAway on this transport.
  55. goAway chan struct{}
  56. // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
  57. awakenKeepalive chan struct{}
  58. framer *framer
  59. // controlBuf delivers all the control related tasks (e.g., window
  60. // updates, reset streams, and various settings) to the controller.
  61. controlBuf *controlBuffer
  62. fc *trInFlow
  63. // The scheme used: https if TLS is on, http otherwise.
  64. scheme string
  65. isSecure bool
  66. creds []credentials.PerRPCCredentials
  67. // Boolean to keep track of reading activity on transport.
  68. // 1 is true and 0 is false.
  69. activity uint32 // Accessed atomically.
  70. kp keepalive.ClientParameters
  71. keepaliveEnabled bool
  72. statsHandler stats.Handler
  73. initialWindowSize int32
  74. bdpEst *bdpEstimator
  75. // onSuccess is a callback that client transport calls upon
  76. // receiving server preface to signal that a succefull HTTP2
  77. // connection was established.
  78. onSuccess func()
  79. maxConcurrentStreams uint32
  80. streamQuota int64
  81. streamsQuotaAvailable chan struct{}
  82. waitingStreams uint32
  83. nextID uint32
  84. mu sync.Mutex // guard the following variables
  85. state transportState
  86. activeStreams map[uint32]*Stream
  87. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  88. prevGoAwayID uint32
  89. // goAwayReason records the http2.ErrCode and debug data received with the
  90. // GoAway frame.
  91. goAwayReason GoAwayReason
  92. // Fields below are for channelz metric collection.
  93. channelzID int64 // channelz unique identification number
  94. czmu sync.RWMutex
  95. kpCount int64
  96. // The number of streams that have started, including already finished ones.
  97. streamsStarted int64
  98. // The number of streams that have ended successfully by receiving EoS bit set
  99. // frame from server.
  100. streamsSucceeded int64
  101. streamsFailed int64
  102. lastStreamCreated time.Time
  103. msgSent int64
  104. msgRecv int64
  105. lastMsgSent time.Time
  106. lastMsgRecv time.Time
  107. }
  108. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
  109. if fn != nil {
  110. return fn(ctx, addr)
  111. }
  112. return dialContext(ctx, "tcp", addr)
  113. }
  114. func isTemporary(err error) bool {
  115. switch err := err.(type) {
  116. case interface {
  117. Temporary() bool
  118. }:
  119. return err.Temporary()
  120. case interface {
  121. Timeout() bool
  122. }:
  123. // Timeouts may be resolved upon retry, and are thus treated as
  124. // temporary.
  125. return err.Timeout()
  126. }
  127. return true
  128. }
  129. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  130. // and starts to receive messages on it. Non-nil error returns if construction
  131. // fails.
  132. func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) {
  133. scheme := "http"
  134. ctx, cancel := context.WithCancel(ctx)
  135. defer func() {
  136. if err != nil {
  137. cancel()
  138. }
  139. }()
  140. conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
  141. if err != nil {
  142. if opts.FailOnNonTempDialError {
  143. return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
  144. }
  145. return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
  146. }
  147. // Any further errors will close the underlying connection
  148. defer func(conn net.Conn) {
  149. if err != nil {
  150. conn.Close()
  151. }
  152. }(conn)
  153. var (
  154. isSecure bool
  155. authInfo credentials.AuthInfo
  156. )
  157. if creds := opts.TransportCredentials; creds != nil {
  158. scheme = "https"
  159. conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Authority, conn)
  160. if err != nil {
  161. return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
  162. }
  163. isSecure = true
  164. }
  165. kp := opts.KeepaliveParams
  166. // Validate keepalive parameters.
  167. if kp.Time == 0 {
  168. kp.Time = defaultClientKeepaliveTime
  169. }
  170. if kp.Timeout == 0 {
  171. kp.Timeout = defaultClientKeepaliveTimeout
  172. }
  173. dynamicWindow := true
  174. icwz := int32(initialWindowSize)
  175. if opts.InitialConnWindowSize >= defaultWindowSize {
  176. icwz = opts.InitialConnWindowSize
  177. dynamicWindow = false
  178. }
  179. writeBufSize := defaultWriteBufSize
  180. if opts.WriteBufferSize > 0 {
  181. writeBufSize = opts.WriteBufferSize
  182. }
  183. readBufSize := defaultReadBufSize
  184. if opts.ReadBufferSize > 0 {
  185. readBufSize = opts.ReadBufferSize
  186. }
  187. t := &http2Client{
  188. ctx: ctx,
  189. ctxDone: ctx.Done(), // Cache Done chan.
  190. cancel: cancel,
  191. userAgent: opts.UserAgent,
  192. md: addr.Metadata,
  193. conn: conn,
  194. remoteAddr: conn.RemoteAddr(),
  195. localAddr: conn.LocalAddr(),
  196. authInfo: authInfo,
  197. readerDone: make(chan struct{}),
  198. writerDone: make(chan struct{}),
  199. goAway: make(chan struct{}),
  200. awakenKeepalive: make(chan struct{}, 1),
  201. framer: newFramer(conn, writeBufSize, readBufSize),
  202. fc: &trInFlow{limit: uint32(icwz)},
  203. scheme: scheme,
  204. activeStreams: make(map[uint32]*Stream),
  205. isSecure: isSecure,
  206. creds: opts.PerRPCCredentials,
  207. kp: kp,
  208. statsHandler: opts.StatsHandler,
  209. initialWindowSize: initialWindowSize,
  210. onSuccess: onSuccess,
  211. nextID: 1,
  212. maxConcurrentStreams: defaultMaxStreamsClient,
  213. streamQuota: defaultMaxStreamsClient,
  214. streamsQuotaAvailable: make(chan struct{}, 1),
  215. }
  216. t.controlBuf = newControlBuffer(t.ctxDone)
  217. if opts.InitialWindowSize >= defaultWindowSize {
  218. t.initialWindowSize = opts.InitialWindowSize
  219. dynamicWindow = false
  220. }
  221. if dynamicWindow {
  222. t.bdpEst = &bdpEstimator{
  223. bdp: initialWindowSize,
  224. updateFlowControl: t.updateFlowControl,
  225. }
  226. }
  227. // Make sure awakenKeepalive can't be written upon.
  228. // keepalive routine will make it writable, if need be.
  229. t.awakenKeepalive <- struct{}{}
  230. if t.statsHandler != nil {
  231. t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
  232. RemoteAddr: t.remoteAddr,
  233. LocalAddr: t.localAddr,
  234. })
  235. connBegin := &stats.ConnBegin{
  236. Client: true,
  237. }
  238. t.statsHandler.HandleConn(t.ctx, connBegin)
  239. }
  240. if channelz.IsOn() {
  241. t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, "")
  242. }
  243. if t.kp.Time != infinity {
  244. t.keepaliveEnabled = true
  245. go t.keepalive()
  246. }
  247. // Start the reader goroutine for incoming message. Each transport has
  248. // a dedicated goroutine which reads HTTP2 frame from network. Then it
  249. // dispatches the frame to the corresponding stream entity.
  250. go t.reader()
  251. // Send connection preface to server.
  252. n, err := t.conn.Write(clientPreface)
  253. if err != nil {
  254. t.Close()
  255. return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
  256. }
  257. if n != len(clientPreface) {
  258. t.Close()
  259. return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  260. }
  261. if t.initialWindowSize != defaultWindowSize {
  262. err = t.framer.fr.WriteSettings(http2.Setting{
  263. ID: http2.SettingInitialWindowSize,
  264. Val: uint32(t.initialWindowSize),
  265. })
  266. } else {
  267. err = t.framer.fr.WriteSettings()
  268. }
  269. if err != nil {
  270. t.Close()
  271. return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  272. }
  273. // Adjust the connection flow control window if needed.
  274. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  275. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  276. t.Close()
  277. return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  278. }
  279. }
  280. t.framer.writer.Flush()
  281. go func() {
  282. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
  283. err := t.loopy.run()
  284. if err != nil {
  285. errorf("transport: loopyWriter.run returning. Err: %v", err)
  286. }
  287. // If it's a connection error, let reader goroutine handle it
  288. // since there might be data in the buffers.
  289. if _, ok := err.(net.Error); !ok {
  290. t.conn.Close()
  291. }
  292. close(t.writerDone)
  293. }()
  294. return t, nil
  295. }
  296. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  297. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  298. s := &Stream{
  299. done: make(chan struct{}),
  300. method: callHdr.Method,
  301. sendCompress: callHdr.SendCompress,
  302. buf: newRecvBuffer(),
  303. headerChan: make(chan struct{}),
  304. contentSubtype: callHdr.ContentSubtype,
  305. }
  306. s.wq = newWriteQuota(defaultWriteQuota, s.done)
  307. s.requestRead = func(n int) {
  308. t.adjustWindow(s, uint32(n))
  309. }
  310. // The client side stream context should have exactly the same life cycle with the user provided context.
  311. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  312. // So we use the original context here instead of creating a copy.
  313. s.ctx = ctx
  314. s.trReader = &transportReader{
  315. reader: &recvBufferReader{
  316. ctx: s.ctx,
  317. ctxDone: s.ctx.Done(),
  318. recv: s.buf,
  319. },
  320. windowHandler: func(n int) {
  321. t.updateWindow(s, uint32(n))
  322. },
  323. }
  324. return s
  325. }
  326. func (t *http2Client) getPeer() *peer.Peer {
  327. pr := &peer.Peer{
  328. Addr: t.remoteAddr,
  329. }
  330. // Attach Auth info if there is any.
  331. if t.authInfo != nil {
  332. pr.AuthInfo = t.authInfo
  333. }
  334. return pr
  335. }
  336. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  337. aud := t.createAudience(callHdr)
  338. authData, err := t.getTrAuthData(ctx, aud)
  339. if err != nil {
  340. return nil, err
  341. }
  342. callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
  343. if err != nil {
  344. return nil, err
  345. }
  346. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  347. // first and create a slice of that exact size.
  348. // Make the slice of certain predictable size to reduce allocations made by append.
  349. hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
  350. hfLen += len(authData) + len(callAuthData)
  351. headerFields := make([]hpack.HeaderField, 0, hfLen)
  352. headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
  353. headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  354. headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  355. headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  356. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
  357. headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  358. headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
  359. if callHdr.SendCompress != "" {
  360. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  361. }
  362. if dl, ok := ctx.Deadline(); ok {
  363. // Send out timeout regardless its value. The server can detect timeout context by itself.
  364. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
  365. timeout := dl.Sub(time.Now())
  366. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
  367. }
  368. for k, v := range authData {
  369. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  370. }
  371. for k, v := range callAuthData {
  372. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  373. }
  374. if b := stats.OutgoingTags(ctx); b != nil {
  375. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
  376. }
  377. if b := stats.OutgoingTrace(ctx); b != nil {
  378. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
  379. }
  380. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  381. var k string
  382. for _, vv := range added {
  383. for i, v := range vv {
  384. if i%2 == 0 {
  385. k = v
  386. continue
  387. }
  388. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  389. if isReservedHeader(k) {
  390. continue
  391. }
  392. headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
  393. }
  394. }
  395. for k, vv := range md {
  396. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  397. if isReservedHeader(k) {
  398. continue
  399. }
  400. for _, v := range vv {
  401. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  402. }
  403. }
  404. }
  405. if md, ok := t.md.(*metadata.MD); ok {
  406. for k, vv := range *md {
  407. if isReservedHeader(k) {
  408. continue
  409. }
  410. for _, v := range vv {
  411. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  412. }
  413. }
  414. }
  415. return headerFields, nil
  416. }
  417. func (t *http2Client) createAudience(callHdr *CallHdr) string {
  418. // Create an audience string only if needed.
  419. if len(t.creds) == 0 && callHdr.Creds == nil {
  420. return ""
  421. }
  422. // Construct URI required to get auth request metadata.
  423. // Omit port if it is the default one.
  424. host := strings.TrimSuffix(callHdr.Host, ":443")
  425. pos := strings.LastIndex(callHdr.Method, "/")
  426. if pos == -1 {
  427. pos = len(callHdr.Method)
  428. }
  429. return "https://" + host + callHdr.Method[:pos]
  430. }
  431. func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
  432. authData := map[string]string{}
  433. for _, c := range t.creds {
  434. data, err := c.GetRequestMetadata(ctx, audience)
  435. if err != nil {
  436. if _, ok := status.FromError(err); ok {
  437. return nil, err
  438. }
  439. return nil, streamErrorf(codes.Unauthenticated, "transport: %v", err)
  440. }
  441. for k, v := range data {
  442. // Capital header names are illegal in HTTP/2.
  443. k = strings.ToLower(k)
  444. authData[k] = v
  445. }
  446. }
  447. return authData, nil
  448. }
  449. func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
  450. callAuthData := map[string]string{}
  451. // Check if credentials.PerRPCCredentials were provided via call options.
  452. // Note: if these credentials are provided both via dial options and call
  453. // options, then both sets of credentials will be applied.
  454. if callCreds := callHdr.Creds; callCreds != nil {
  455. if !t.isSecure && callCreds.RequireTransportSecurity() {
  456. return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
  457. }
  458. data, err := callCreds.GetRequestMetadata(ctx, audience)
  459. if err != nil {
  460. return nil, streamErrorf(codes.Internal, "transport: %v", err)
  461. }
  462. for k, v := range data {
  463. // Capital header names are illegal in HTTP/2
  464. k = strings.ToLower(k)
  465. callAuthData[k] = v
  466. }
  467. }
  468. return callAuthData, nil
  469. }
  470. // NewStream creates a stream and registers it into the transport as "active"
  471. // streams.
  472. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  473. ctx = peer.NewContext(ctx, t.getPeer())
  474. headerFields, err := t.createHeaderFields(ctx, callHdr)
  475. if err != nil {
  476. return nil, err
  477. }
  478. s := t.newStream(ctx, callHdr)
  479. cleanup := func(err error) {
  480. if s.swapState(streamDone) == streamDone {
  481. // If it was already done, return.
  482. return
  483. }
  484. // The stream was unprocessed by the server.
  485. atomic.StoreUint32(&s.unprocessed, 1)
  486. s.write(recvMsg{err: err})
  487. close(s.done)
  488. // If headerChan isn't closed, then close it.
  489. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  490. close(s.headerChan)
  491. }
  492. }
  493. hdr := &headerFrame{
  494. hf: headerFields,
  495. endStream: false,
  496. initStream: func(id uint32) (bool, error) {
  497. t.mu.Lock()
  498. if state := t.state; state != reachable {
  499. t.mu.Unlock()
  500. // Do a quick cleanup.
  501. err := error(errStreamDrain)
  502. if state == closing {
  503. err = ErrConnClosing
  504. }
  505. cleanup(err)
  506. return false, err
  507. }
  508. t.activeStreams[id] = s
  509. if channelz.IsOn() {
  510. t.czmu.Lock()
  511. t.streamsStarted++
  512. t.lastStreamCreated = time.Now()
  513. t.czmu.Unlock()
  514. }
  515. var sendPing bool
  516. // If the number of active streams change from 0 to 1, then check if keepalive
  517. // has gone dormant. If so, wake it up.
  518. if len(t.activeStreams) == 1 && t.keepaliveEnabled {
  519. select {
  520. case t.awakenKeepalive <- struct{}{}:
  521. sendPing = true
  522. // Fill the awakenKeepalive channel again as this channel must be
  523. // kept non-writable except at the point that the keepalive()
  524. // goroutine is waiting either to be awaken or shutdown.
  525. t.awakenKeepalive <- struct{}{}
  526. default:
  527. }
  528. }
  529. t.mu.Unlock()
  530. return sendPing, nil
  531. },
  532. onOrphaned: cleanup,
  533. wq: s.wq,
  534. }
  535. firstTry := true
  536. var ch chan struct{}
  537. checkForStreamQuota := func(it interface{}) bool {
  538. if t.streamQuota <= 0 { // Can go negative if server decreases it.
  539. if firstTry {
  540. t.waitingStreams++
  541. }
  542. ch = t.streamsQuotaAvailable
  543. return false
  544. }
  545. if !firstTry {
  546. t.waitingStreams--
  547. }
  548. t.streamQuota--
  549. h := it.(*headerFrame)
  550. h.streamID = t.nextID
  551. t.nextID += 2
  552. s.id = h.streamID
  553. s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
  554. if t.streamQuota > 0 && t.waitingStreams > 0 {
  555. select {
  556. case t.streamsQuotaAvailable <- struct{}{}:
  557. default:
  558. }
  559. }
  560. return true
  561. }
  562. for {
  563. success, err := t.controlBuf.executeAndPut(checkForStreamQuota, hdr)
  564. if err != nil {
  565. return nil, err
  566. }
  567. if success {
  568. break
  569. }
  570. firstTry = false
  571. select {
  572. case <-ch:
  573. case <-s.ctx.Done():
  574. return nil, ContextErr(s.ctx.Err())
  575. case <-t.goAway:
  576. return nil, errStreamDrain
  577. case <-t.ctx.Done():
  578. return nil, ErrConnClosing
  579. }
  580. }
  581. if t.statsHandler != nil {
  582. outHeader := &stats.OutHeader{
  583. Client: true,
  584. FullMethod: callHdr.Method,
  585. RemoteAddr: t.remoteAddr,
  586. LocalAddr: t.localAddr,
  587. Compression: callHdr.SendCompress,
  588. }
  589. t.statsHandler.HandleRPC(s.ctx, outHeader)
  590. }
  591. return s, nil
  592. }
  593. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  594. // This must not be executed in reader's goroutine.
  595. func (t *http2Client) CloseStream(s *Stream, err error) {
  596. var (
  597. rst bool
  598. rstCode http2.ErrCode
  599. )
  600. if err != nil {
  601. rst = true
  602. rstCode = http2.ErrCodeCancel
  603. }
  604. t.closeStream(s, err, rst, rstCode, nil, nil, false)
  605. }
  606. func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
  607. // Set stream status to done.
  608. if s.swapState(streamDone) == streamDone {
  609. // If it was already done, return.
  610. return
  611. }
  612. // status and trailers can be updated here without any synchronization because the stream goroutine will
  613. // only read it after it sees an io.EOF error from read or write and we'll write those errors
  614. // only after updating this.
  615. s.status = st
  616. if len(mdata) > 0 {
  617. s.trailer = mdata
  618. }
  619. if err != nil {
  620. // This will unblock reads eventually.
  621. s.write(recvMsg{err: err})
  622. }
  623. // This will unblock write.
  624. close(s.done)
  625. // If headerChan isn't closed, then close it.
  626. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  627. close(s.headerChan)
  628. }
  629. cleanup := &cleanupStream{
  630. streamID: s.id,
  631. onWrite: func() {
  632. t.mu.Lock()
  633. if t.activeStreams != nil {
  634. delete(t.activeStreams, s.id)
  635. }
  636. t.mu.Unlock()
  637. if channelz.IsOn() {
  638. t.czmu.Lock()
  639. if eosReceived {
  640. t.streamsSucceeded++
  641. } else {
  642. t.streamsFailed++
  643. }
  644. t.czmu.Unlock()
  645. }
  646. },
  647. rst: rst,
  648. rstCode: rstCode,
  649. }
  650. addBackStreamQuota := func(interface{}) bool {
  651. t.streamQuota++
  652. if t.streamQuota > 0 && t.waitingStreams > 0 {
  653. select {
  654. case t.streamsQuotaAvailable <- struct{}{}:
  655. default:
  656. }
  657. }
  658. return true
  659. }
  660. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
  661. }
  662. // Close kicks off the shutdown process of the transport. This should be called
  663. // only once on a transport. Once it is called, the transport should not be
  664. // accessed any more.
  665. func (t *http2Client) Close() error {
  666. t.mu.Lock()
  667. // Make sure we only Close once.
  668. if t.state == closing {
  669. t.mu.Unlock()
  670. return nil
  671. }
  672. t.state = closing
  673. streams := t.activeStreams
  674. t.activeStreams = nil
  675. t.mu.Unlock()
  676. t.controlBuf.finish()
  677. t.cancel()
  678. err := t.conn.Close()
  679. if channelz.IsOn() {
  680. channelz.RemoveEntry(t.channelzID)
  681. }
  682. // Notify all active streams.
  683. for _, s := range streams {
  684. t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, nil, nil, false)
  685. }
  686. if t.statsHandler != nil {
  687. connEnd := &stats.ConnEnd{
  688. Client: true,
  689. }
  690. t.statsHandler.HandleConn(t.ctx, connEnd)
  691. }
  692. return err
  693. }
  694. // GracefulClose sets the state to draining, which prevents new streams from
  695. // being created and causes the transport to be closed when the last active
  696. // stream is closed. If there are no active streams, the transport is closed
  697. // immediately. This does nothing if the transport is already draining or
  698. // closing.
  699. func (t *http2Client) GracefulClose() error {
  700. t.mu.Lock()
  701. // Make sure we move to draining only from active.
  702. if t.state == draining || t.state == closing {
  703. t.mu.Unlock()
  704. return nil
  705. }
  706. t.state = draining
  707. active := len(t.activeStreams)
  708. t.mu.Unlock()
  709. if active == 0 {
  710. return t.Close()
  711. }
  712. t.controlBuf.put(&incomingGoAway{})
  713. return nil
  714. }
  715. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  716. // should proceed only if Write returns nil.
  717. func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  718. if opts.Last {
  719. // If it's the last message, update stream state.
  720. if !s.compareAndSwapState(streamActive, streamWriteDone) {
  721. return errStreamDone
  722. }
  723. } else if s.getState() != streamActive {
  724. return errStreamDone
  725. }
  726. df := &dataFrame{
  727. streamID: s.id,
  728. endStream: opts.Last,
  729. }
  730. if hdr != nil || data != nil { // If it's not an empty data frame.
  731. // Add some data to grpc message header so that we can equally
  732. // distribute bytes across frames.
  733. emptyLen := http2MaxFrameLen - len(hdr)
  734. if emptyLen > len(data) {
  735. emptyLen = len(data)
  736. }
  737. hdr = append(hdr, data[:emptyLen]...)
  738. data = data[emptyLen:]
  739. df.h, df.d = hdr, data
  740. // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
  741. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  742. return err
  743. }
  744. }
  745. return t.controlBuf.put(df)
  746. }
  747. func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
  748. t.mu.Lock()
  749. defer t.mu.Unlock()
  750. s, ok := t.activeStreams[f.Header().StreamID]
  751. return s, ok
  752. }
  753. // adjustWindow sends out extra window update over the initial window size
  754. // of stream if the application is requesting data larger in size than
  755. // the window.
  756. func (t *http2Client) adjustWindow(s *Stream, n uint32) {
  757. if w := s.fc.maybeAdjust(n); w > 0 {
  758. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  759. }
  760. }
  761. // updateWindow adjusts the inbound quota for the stream.
  762. // Window updates will be sent out when the cumulative quota
  763. // exceeds the corresponding threshold.
  764. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  765. if w := s.fc.onRead(n); w > 0 {
  766. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  767. }
  768. }
  769. // updateFlowControl updates the incoming flow control windows
  770. // for the transport and the stream based on the current bdp
  771. // estimation.
  772. func (t *http2Client) updateFlowControl(n uint32) {
  773. t.mu.Lock()
  774. for _, s := range t.activeStreams {
  775. s.fc.newLimit(n)
  776. }
  777. t.mu.Unlock()
  778. updateIWS := func(interface{}) bool {
  779. t.initialWindowSize = int32(n)
  780. return true
  781. }
  782. t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
  783. t.controlBuf.put(&outgoingSettings{
  784. ss: []http2.Setting{
  785. {
  786. ID: http2.SettingInitialWindowSize,
  787. Val: n,
  788. },
  789. },
  790. })
  791. }
  792. func (t *http2Client) handleData(f *http2.DataFrame) {
  793. size := f.Header().Length
  794. var sendBDPPing bool
  795. if t.bdpEst != nil {
  796. sendBDPPing = t.bdpEst.add(size)
  797. }
  798. // Decouple connection's flow control from application's read.
  799. // An update on connection's flow control should not depend on
  800. // whether user application has read the data or not. Such a
  801. // restriction is already imposed on the stream's flow control,
  802. // and therefore the sender will be blocked anyways.
  803. // Decoupling the connection flow control will prevent other
  804. // active(fast) streams from starving in presence of slow or
  805. // inactive streams.
  806. //
  807. if w := t.fc.onData(size); w > 0 {
  808. t.controlBuf.put(&outgoingWindowUpdate{
  809. streamID: 0,
  810. increment: w,
  811. })
  812. }
  813. if sendBDPPing {
  814. // Avoid excessive ping detection (e.g. in an L7 proxy)
  815. // by sending a window update prior to the BDP ping.
  816. if w := t.fc.reset(); w > 0 {
  817. t.controlBuf.put(&outgoingWindowUpdate{
  818. streamID: 0,
  819. increment: w,
  820. })
  821. }
  822. t.controlBuf.put(bdpPing)
  823. }
  824. // Select the right stream to dispatch.
  825. s, ok := t.getStream(f)
  826. if !ok {
  827. return
  828. }
  829. if size > 0 {
  830. if err := s.fc.onData(size); err != nil {
  831. t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
  832. return
  833. }
  834. if f.Header().Flags.Has(http2.FlagDataPadded) {
  835. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  836. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  837. }
  838. }
  839. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  840. // guarantee f.Data() is consumed before the arrival of next frame.
  841. // Can this copy be eliminated?
  842. if len(f.Data()) > 0 {
  843. data := make([]byte, len(f.Data()))
  844. copy(data, f.Data())
  845. s.write(recvMsg{data: data})
  846. }
  847. }
  848. // The server has closed the stream without sending trailers. Record that
  849. // the read direction is closed, and set the status appropriately.
  850. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
  851. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
  852. }
  853. }
  854. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  855. s, ok := t.getStream(f)
  856. if !ok {
  857. return
  858. }
  859. if f.ErrCode == http2.ErrCodeRefusedStream {
  860. // The stream was unprocessed by the server.
  861. atomic.StoreUint32(&s.unprocessed, 1)
  862. }
  863. statusCode, ok := http2ErrConvTab[f.ErrCode]
  864. if !ok {
  865. warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
  866. statusCode = codes.Unknown
  867. }
  868. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
  869. }
  870. func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
  871. if f.IsAck() {
  872. return
  873. }
  874. var maxStreams *uint32
  875. var ss []http2.Setting
  876. f.ForeachSetting(func(s http2.Setting) error {
  877. if s.ID == http2.SettingMaxConcurrentStreams {
  878. maxStreams = new(uint32)
  879. *maxStreams = s.Val
  880. return nil
  881. }
  882. ss = append(ss, s)
  883. return nil
  884. })
  885. if isFirst && maxStreams == nil {
  886. maxStreams = new(uint32)
  887. *maxStreams = math.MaxUint32
  888. }
  889. sf := &incomingSettings{
  890. ss: ss,
  891. }
  892. if maxStreams == nil {
  893. t.controlBuf.put(sf)
  894. return
  895. }
  896. updateStreamQuota := func(interface{}) bool {
  897. delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
  898. t.maxConcurrentStreams = *maxStreams
  899. t.streamQuota += delta
  900. if delta > 0 && t.waitingStreams > 0 {
  901. close(t.streamsQuotaAvailable) // wake all of them up.
  902. t.streamsQuotaAvailable = make(chan struct{}, 1)
  903. }
  904. return true
  905. }
  906. t.controlBuf.executeAndPut(updateStreamQuota, sf)
  907. }
  908. func (t *http2Client) handlePing(f *http2.PingFrame) {
  909. if f.IsAck() {
  910. // Maybe it's a BDP ping.
  911. if t.bdpEst != nil {
  912. t.bdpEst.calculate(f.Data)
  913. }
  914. return
  915. }
  916. pingAck := &ping{ack: true}
  917. copy(pingAck.data[:], f.Data[:])
  918. t.controlBuf.put(pingAck)
  919. }
  920. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  921. t.mu.Lock()
  922. if t.state == closing {
  923. t.mu.Unlock()
  924. return
  925. }
  926. if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
  927. infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
  928. }
  929. id := f.LastStreamID
  930. if id > 0 && id%2 != 1 {
  931. t.mu.Unlock()
  932. t.Close()
  933. return
  934. }
  935. // A client can receive multiple GoAways from the server (see
  936. // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
  937. // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
  938. // sent after an RTT delay with the ID of the last stream the server will
  939. // process.
  940. //
  941. // Therefore, when we get the first GoAway we don't necessarily close any
  942. // streams. While in case of second GoAway we close all streams created after
  943. // the GoAwayId. This way streams that were in-flight while the GoAway from
  944. // server was being sent don't get killed.
  945. select {
  946. case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
  947. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
  948. if id > t.prevGoAwayID {
  949. t.mu.Unlock()
  950. t.Close()
  951. return
  952. }
  953. default:
  954. t.setGoAwayReason(f)
  955. close(t.goAway)
  956. t.state = draining
  957. t.controlBuf.put(&incomingGoAway{})
  958. }
  959. // All streams with IDs greater than the GoAwayId
  960. // and smaller than the previous GoAway ID should be killed.
  961. upperLimit := t.prevGoAwayID
  962. if upperLimit == 0 { // This is the first GoAway Frame.
  963. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
  964. }
  965. for streamID, stream := range t.activeStreams {
  966. if streamID > id && streamID <= upperLimit {
  967. // The stream was unprocessed by the server.
  968. atomic.StoreUint32(&stream.unprocessed, 1)
  969. t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
  970. }
  971. }
  972. t.prevGoAwayID = id
  973. active := len(t.activeStreams)
  974. t.mu.Unlock()
  975. if active == 0 {
  976. t.Close()
  977. }
  978. }
  979. // setGoAwayReason sets the value of t.goAwayReason based
  980. // on the GoAway frame received.
  981. // It expects a lock on transport's mutext to be held by
  982. // the caller.
  983. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
  984. t.goAwayReason = GoAwayNoReason
  985. switch f.ErrCode {
  986. case http2.ErrCodeEnhanceYourCalm:
  987. if string(f.DebugData()) == "too_many_pings" {
  988. t.goAwayReason = GoAwayTooManyPings
  989. }
  990. }
  991. }
  992. func (t *http2Client) GetGoAwayReason() GoAwayReason {
  993. t.mu.Lock()
  994. defer t.mu.Unlock()
  995. return t.goAwayReason
  996. }
  997. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  998. t.controlBuf.put(&incomingWindowUpdate{
  999. streamID: f.Header().StreamID,
  1000. increment: f.Increment,
  1001. })
  1002. }
  1003. // operateHeaders takes action on the decoded headers.
  1004. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  1005. s, ok := t.getStream(frame)
  1006. if !ok {
  1007. return
  1008. }
  1009. atomic.StoreUint32(&s.bytesReceived, 1)
  1010. var state decodeState
  1011. if err := state.decodeResponseHeader(frame); err != nil {
  1012. t.closeStream(s, err, true, http2.ErrCodeProtocol, nil, nil, false)
  1013. // Something wrong. Stops reading even when there is remaining.
  1014. return
  1015. }
  1016. endStream := frame.StreamEnded()
  1017. var isHeader bool
  1018. defer func() {
  1019. if t.statsHandler != nil {
  1020. if isHeader {
  1021. inHeader := &stats.InHeader{
  1022. Client: true,
  1023. WireLength: int(frame.Header().Length),
  1024. }
  1025. t.statsHandler.HandleRPC(s.ctx, inHeader)
  1026. } else {
  1027. inTrailer := &stats.InTrailer{
  1028. Client: true,
  1029. WireLength: int(frame.Header().Length),
  1030. }
  1031. t.statsHandler.HandleRPC(s.ctx, inTrailer)
  1032. }
  1033. }
  1034. }()
  1035. // If headers haven't been received yet.
  1036. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  1037. if !endStream {
  1038. // Headers frame is not actually a trailers-only frame.
  1039. isHeader = true
  1040. // These values can be set without any synchronization because
  1041. // stream goroutine will read it only after seeing a closed
  1042. // headerChan which we'll close after setting this.
  1043. s.recvCompress = state.encoding
  1044. if len(state.mdata) > 0 {
  1045. s.header = state.mdata
  1046. }
  1047. }
  1048. close(s.headerChan)
  1049. }
  1050. if !endStream {
  1051. return
  1052. }
  1053. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, state.status(), state.mdata, true)
  1054. }
  1055. // reader runs as a separate goroutine in charge of reading data from network
  1056. // connection.
  1057. //
  1058. // TODO(zhaoq): currently one reader per transport. Investigate whether this is
  1059. // optimal.
  1060. // TODO(zhaoq): Check the validity of the incoming frame sequence.
  1061. func (t *http2Client) reader() {
  1062. defer close(t.readerDone)
  1063. // Check the validity of server preface.
  1064. frame, err := t.framer.fr.ReadFrame()
  1065. if err != nil {
  1066. t.Close()
  1067. return
  1068. }
  1069. if t.keepaliveEnabled {
  1070. atomic.CompareAndSwapUint32(&t.activity, 0, 1)
  1071. }
  1072. sf, ok := frame.(*http2.SettingsFrame)
  1073. if !ok {
  1074. t.Close()
  1075. return
  1076. }
  1077. t.onSuccess()
  1078. t.handleSettings(sf, true)
  1079. // loop to keep reading incoming messages on this transport.
  1080. for {
  1081. frame, err := t.framer.fr.ReadFrame()
  1082. if t.keepaliveEnabled {
  1083. atomic.CompareAndSwapUint32(&t.activity, 0, 1)
  1084. }
  1085. if err != nil {
  1086. // Abort an active stream if the http2.Framer returns a
  1087. // http2.StreamError. This can happen only if the server's response
  1088. // is malformed http2.
  1089. if se, ok := err.(http2.StreamError); ok {
  1090. t.mu.Lock()
  1091. s := t.activeStreams[se.StreamID]
  1092. t.mu.Unlock()
  1093. if s != nil {
  1094. // use error detail to provide better err message
  1095. t.closeStream(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail()), true, http2.ErrCodeProtocol, nil, nil, false)
  1096. }
  1097. continue
  1098. } else {
  1099. // Transport error.
  1100. t.Close()
  1101. return
  1102. }
  1103. }
  1104. switch frame := frame.(type) {
  1105. case *http2.MetaHeadersFrame:
  1106. t.operateHeaders(frame)
  1107. case *http2.DataFrame:
  1108. t.handleData(frame)
  1109. case *http2.RSTStreamFrame:
  1110. t.handleRSTStream(frame)
  1111. case *http2.SettingsFrame:
  1112. t.handleSettings(frame, false)
  1113. case *http2.PingFrame:
  1114. t.handlePing(frame)
  1115. case *http2.GoAwayFrame:
  1116. t.handleGoAway(frame)
  1117. case *http2.WindowUpdateFrame:
  1118. t.handleWindowUpdate(frame)
  1119. default:
  1120. errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
  1121. }
  1122. }
  1123. }
  1124. // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
  1125. func (t *http2Client) keepalive() {
  1126. p := &ping{data: [8]byte{}}
  1127. timer := time.NewTimer(t.kp.Time)
  1128. for {
  1129. select {
  1130. case <-timer.C:
  1131. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  1132. timer.Reset(t.kp.Time)
  1133. continue
  1134. }
  1135. // Check if keepalive should go dormant.
  1136. t.mu.Lock()
  1137. if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
  1138. // Make awakenKeepalive writable.
  1139. <-t.awakenKeepalive
  1140. t.mu.Unlock()
  1141. select {
  1142. case <-t.awakenKeepalive:
  1143. // If the control gets here a ping has been sent
  1144. // need to reset the timer with keepalive.Timeout.
  1145. case <-t.ctx.Done():
  1146. return
  1147. }
  1148. } else {
  1149. t.mu.Unlock()
  1150. if channelz.IsOn() {
  1151. t.czmu.Lock()
  1152. t.kpCount++
  1153. t.czmu.Unlock()
  1154. }
  1155. // Send ping.
  1156. t.controlBuf.put(p)
  1157. }
  1158. // By the time control gets here a ping has been sent one way or the other.
  1159. timer.Reset(t.kp.Timeout)
  1160. select {
  1161. case <-timer.C:
  1162. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  1163. timer.Reset(t.kp.Time)
  1164. continue
  1165. }
  1166. t.Close()
  1167. return
  1168. case <-t.ctx.Done():
  1169. if !timer.Stop() {
  1170. <-timer.C
  1171. }
  1172. return
  1173. }
  1174. case <-t.ctx.Done():
  1175. if !timer.Stop() {
  1176. <-timer.C
  1177. }
  1178. return
  1179. }
  1180. }
  1181. }
  1182. func (t *http2Client) Error() <-chan struct{} {
  1183. return t.ctx.Done()
  1184. }
  1185. func (t *http2Client) GoAway() <-chan struct{} {
  1186. return t.goAway
  1187. }
  1188. func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
  1189. t.czmu.RLock()
  1190. s := channelz.SocketInternalMetric{
  1191. StreamsStarted: t.streamsStarted,
  1192. StreamsSucceeded: t.streamsSucceeded,
  1193. StreamsFailed: t.streamsFailed,
  1194. MessagesSent: t.msgSent,
  1195. MessagesReceived: t.msgRecv,
  1196. KeepAlivesSent: t.kpCount,
  1197. LastLocalStreamCreatedTimestamp: t.lastStreamCreated,
  1198. LastMessageSentTimestamp: t.lastMsgSent,
  1199. LastMessageReceivedTimestamp: t.lastMsgRecv,
  1200. LocalFlowControlWindow: int64(t.fc.getSize()),
  1201. //socket options
  1202. LocalAddr: t.localAddr,
  1203. RemoteAddr: t.remoteAddr,
  1204. // Security
  1205. // RemoteName :
  1206. }
  1207. t.czmu.RUnlock()
  1208. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1209. return &s
  1210. }
  1211. func (t *http2Client) IncrMsgSent() {
  1212. t.czmu.Lock()
  1213. t.msgSent++
  1214. t.lastMsgSent = time.Now()
  1215. t.czmu.Unlock()
  1216. }
  1217. func (t *http2Client) IncrMsgRecv() {
  1218. t.czmu.Lock()
  1219. t.msgRecv++
  1220. t.lastMsgRecv = time.Now()
  1221. t.czmu.Unlock()
  1222. }
  1223. func (t *http2Client) getOutFlowWindow() int64 {
  1224. resp := make(chan uint32, 1)
  1225. timer := time.NewTimer(time.Second)
  1226. defer timer.Stop()
  1227. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1228. select {
  1229. case sz := <-resp:
  1230. return int64(sz)
  1231. case <-t.ctxDone:
  1232. return -1
  1233. case <-timer.C:
  1234. return -2
  1235. }
  1236. }