flowcontrol.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "fmt"
  21. "math"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. )
  26. const (
  27. // The default value of flow control window size in HTTP2 spec.
  28. defaultWindowSize = 65535
  29. // The initial window size for flow control.
  30. initialWindowSize = defaultWindowSize // for an RPC
  31. infinity = time.Duration(math.MaxInt64)
  32. defaultClientKeepaliveTime = infinity
  33. defaultClientKeepaliveTimeout = 20 * time.Second
  34. defaultMaxStreamsClient = 100
  35. defaultMaxConnectionIdle = infinity
  36. defaultMaxConnectionAge = infinity
  37. defaultMaxConnectionAgeGrace = infinity
  38. defaultServerKeepaliveTime = 2 * time.Hour
  39. defaultServerKeepaliveTimeout = 20 * time.Second
  40. defaultKeepalivePolicyMinTime = 5 * time.Minute
  41. // max window limit set by HTTP2 Specs.
  42. maxWindowSize = math.MaxInt32
  43. // defaultWriteQuota is the default value for number of data
  44. // bytes that each stream can schedule before some of it being
  45. // flushed out.
  46. defaultWriteQuota = 64 * 1024
  47. )
  48. // writeQuota is a soft limit on the amount of data a stream can
  49. // schedule before some of it is written out.
  50. type writeQuota struct {
  51. quota int32
  52. // get waits on read from when quota goes less than or equal to zero.
  53. // replenish writes on it when quota goes positive again.
  54. ch chan struct{}
  55. // done is triggered in error case.
  56. done <-chan struct{}
  57. // replenish is called by loopyWriter to give quota back to.
  58. // It is implemented as a field so that it can be updated
  59. // by tests.
  60. replenish func(n int)
  61. }
  62. func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
  63. w := &writeQuota{
  64. quota: sz,
  65. ch: make(chan struct{}, 1),
  66. done: done,
  67. }
  68. w.replenish = w.realReplenish
  69. return w
  70. }
  71. func (w *writeQuota) get(sz int32) error {
  72. for {
  73. if atomic.LoadInt32(&w.quota) > 0 {
  74. atomic.AddInt32(&w.quota, -sz)
  75. return nil
  76. }
  77. select {
  78. case <-w.ch:
  79. continue
  80. case <-w.done:
  81. return errStreamDone
  82. }
  83. }
  84. }
  85. func (w *writeQuota) realReplenish(n int) {
  86. sz := int32(n)
  87. a := atomic.AddInt32(&w.quota, sz)
  88. b := a - sz
  89. if b <= 0 && a > 0 {
  90. select {
  91. case w.ch <- struct{}{}:
  92. default:
  93. }
  94. }
  95. }
  96. type trInFlow struct {
  97. limit uint32
  98. unacked uint32
  99. effectiveWindowSize uint32
  100. }
  101. func (f *trInFlow) newLimit(n uint32) uint32 {
  102. d := n - f.limit
  103. f.limit = n
  104. f.updateEffectiveWindowSize()
  105. return d
  106. }
  107. func (f *trInFlow) onData(n uint32) uint32 {
  108. f.unacked += n
  109. if f.unacked >= f.limit/4 {
  110. w := f.unacked
  111. f.unacked = 0
  112. f.updateEffectiveWindowSize()
  113. return w
  114. }
  115. f.updateEffectiveWindowSize()
  116. return 0
  117. }
  118. func (f *trInFlow) reset() uint32 {
  119. w := f.unacked
  120. f.unacked = 0
  121. f.updateEffectiveWindowSize()
  122. return w
  123. }
  124. func (f *trInFlow) updateEffectiveWindowSize() {
  125. atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
  126. }
  127. func (f *trInFlow) getSize() uint32 {
  128. return atomic.LoadUint32(&f.effectiveWindowSize)
  129. }
  130. // TODO(mmukhi): Simplify this code.
  131. // inFlow deals with inbound flow control
  132. type inFlow struct {
  133. mu sync.Mutex
  134. // The inbound flow control limit for pending data.
  135. limit uint32
  136. // pendingData is the overall data which have been received but not been
  137. // consumed by applications.
  138. pendingData uint32
  139. // The amount of data the application has consumed but grpc has not sent
  140. // window update for them. Used to reduce window update frequency.
  141. pendingUpdate uint32
  142. // delta is the extra window update given by receiver when an application
  143. // is reading data bigger in size than the inFlow limit.
  144. delta uint32
  145. }
  146. // newLimit updates the inflow window to a new value n.
  147. // It assumes that n is always greater than the old limit.
  148. func (f *inFlow) newLimit(n uint32) uint32 {
  149. f.mu.Lock()
  150. d := n - f.limit
  151. f.limit = n
  152. f.mu.Unlock()
  153. return d
  154. }
  155. func (f *inFlow) maybeAdjust(n uint32) uint32 {
  156. if n > uint32(math.MaxInt32) {
  157. n = uint32(math.MaxInt32)
  158. }
  159. f.mu.Lock()
  160. // estSenderQuota is the receiver's view of the maximum number of bytes the sender
  161. // can send without a window update.
  162. estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
  163. // estUntransmittedData is the maximum number of bytes the sends might not have put
  164. // on the wire yet. A value of 0 or less means that we have already received all or
  165. // more bytes than the application is requesting to read.
  166. estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
  167. // This implies that unless we send a window update, the sender won't be able to send all the bytes
  168. // for this message. Therefore we must send an update over the limit since there's an active read
  169. // request from the application.
  170. if estUntransmittedData > estSenderQuota {
  171. // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
  172. if f.limit+n > maxWindowSize {
  173. f.delta = maxWindowSize - f.limit
  174. } else {
  175. // Send a window update for the whole message and not just the difference between
  176. // estUntransmittedData and estSenderQuota. This will be helpful in case the message
  177. // is padded; We will fallback on the current available window(at least a 1/4th of the limit).
  178. f.delta = n
  179. }
  180. f.mu.Unlock()
  181. return f.delta
  182. }
  183. f.mu.Unlock()
  184. return 0
  185. }
  186. // onData is invoked when some data frame is received. It updates pendingData.
  187. func (f *inFlow) onData(n uint32) error {
  188. f.mu.Lock()
  189. f.pendingData += n
  190. if f.pendingData+f.pendingUpdate > f.limit+f.delta {
  191. limit := f.limit
  192. rcvd := f.pendingData + f.pendingUpdate
  193. f.mu.Unlock()
  194. return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
  195. }
  196. f.mu.Unlock()
  197. return nil
  198. }
  199. // onRead is invoked when the application reads the data. It returns the window size
  200. // to be sent to the peer.
  201. func (f *inFlow) onRead(n uint32) uint32 {
  202. f.mu.Lock()
  203. if f.pendingData == 0 {
  204. f.mu.Unlock()
  205. return 0
  206. }
  207. f.pendingData -= n
  208. if n > f.delta {
  209. n -= f.delta
  210. f.delta = 0
  211. } else {
  212. f.delta -= n
  213. n = 0
  214. }
  215. f.pendingUpdate += n
  216. if f.pendingUpdate >= f.limit/4 {
  217. wu := f.pendingUpdate
  218. f.pendingUpdate = 0
  219. f.mu.Unlock()
  220. return wu
  221. }
  222. f.mu.Unlock()
  223. return 0
  224. }