flowrate.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. //
  2. // Written by Maxim Khitrov (November 2012)
  3. //
  4. // Package flowrate provides the tools for monitoring and limiting the flow rate
  5. // of an arbitrary data stream.
  6. package flowrate
  7. import (
  8. "math"
  9. "sync"
  10. "time"
  11. )
  12. // Monitor monitors and limits the transfer rate of a data stream.
  13. type Monitor struct {
  14. mu sync.Mutex // Mutex guarding access to all internal fields
  15. active bool // Flag indicating an active transfer
  16. start time.Duration // Transfer start time (clock() value)
  17. bytes int64 // Total number of bytes transferred
  18. samples int64 // Total number of samples taken
  19. rSample float64 // Most recent transfer rate sample (bytes per second)
  20. rEMA float64 // Exponential moving average of rSample
  21. rPeak float64 // Peak transfer rate (max of all rSamples)
  22. rWindow float64 // rEMA window (seconds)
  23. sBytes int64 // Number of bytes transferred since sLast
  24. sLast time.Duration // Most recent sample time (stop time when inactive)
  25. sRate time.Duration // Sampling rate
  26. tBytes int64 // Number of bytes expected in the current transfer
  27. tLast time.Duration // Time of the most recent transfer of at least 1 byte
  28. }
  29. // New creates a new flow control monitor. Instantaneous transfer rate is
  30. // measured and updated for each sampleRate interval. windowSize determines the
  31. // weight of each sample in the exponential moving average (EMA) calculation.
  32. // The exact formulas are:
  33. //
  34. // sampleTime = currentTime - prevSampleTime
  35. // sampleRate = byteCount / sampleTime
  36. // weight = 1 - exp(-sampleTime/windowSize)
  37. // newRate = weight*sampleRate + (1-weight)*oldRate
  38. //
  39. // The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
  40. // respectively.
  41. func New(sampleRate, windowSize time.Duration) *Monitor {
  42. if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
  43. sampleRate = 5 * clockRate
  44. }
  45. if windowSize <= 0 {
  46. windowSize = 1 * time.Second
  47. }
  48. now := clock()
  49. return &Monitor{
  50. active: true,
  51. start: now,
  52. rWindow: windowSize.Seconds(),
  53. sLast: now,
  54. sRate: sampleRate,
  55. tLast: now,
  56. }
  57. }
  58. // Update records the transfer of n bytes and returns n. It should be called
  59. // after each Read/Write operation, even if n is 0.
  60. func (m *Monitor) Update(n int) int {
  61. m.mu.Lock()
  62. m.update(n)
  63. m.mu.Unlock()
  64. return n
  65. }
  66. // IO is a convenience method intended to wrap io.Reader and io.Writer method
  67. // execution. It calls m.Update(n) and then returns (n, err) unmodified.
  68. func (m *Monitor) IO(n int, err error) (int, error) {
  69. return m.Update(n), err
  70. }
  71. // Done marks the transfer as finished and prevents any further updates or
  72. // limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
  73. // Limit methods become NOOPs. It returns the total number of bytes transferred.
  74. func (m *Monitor) Done() int64 {
  75. m.mu.Lock()
  76. if now := m.update(0); m.sBytes > 0 {
  77. m.reset(now)
  78. }
  79. m.active = false
  80. m.tLast = 0
  81. n := m.bytes
  82. m.mu.Unlock()
  83. return n
  84. }
  85. // timeRemLimit is the maximum Status.TimeRem value.
  86. const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
  87. // Status represents the current Monitor status. All transfer rates are in bytes
  88. // per second rounded to the nearest byte.
  89. type Status struct {
  90. Active bool // Flag indicating an active transfer
  91. Start time.Time // Transfer start time
  92. Duration time.Duration // Time period covered by the statistics
  93. Idle time.Duration // Time since the last transfer of at least 1 byte
  94. Bytes int64 // Total number of bytes transferred
  95. Samples int64 // Total number of samples taken
  96. InstRate int64 // Instantaneous transfer rate
  97. CurRate int64 // Current transfer rate (EMA of InstRate)
  98. AvgRate int64 // Average transfer rate (Bytes / Duration)
  99. PeakRate int64 // Maximum instantaneous transfer rate
  100. BytesRem int64 // Number of bytes remaining in the transfer
  101. TimeRem time.Duration // Estimated time to completion
  102. Progress Percent // Overall transfer progress
  103. }
  104. // Status returns current transfer status information. The returned value
  105. // becomes static after a call to Done.
  106. func (m *Monitor) Status() Status {
  107. m.mu.Lock()
  108. now := m.update(0)
  109. s := Status{
  110. Active: m.active,
  111. Start: clockToTime(m.start),
  112. Duration: m.sLast - m.start,
  113. Idle: now - m.tLast,
  114. Bytes: m.bytes,
  115. Samples: m.samples,
  116. PeakRate: round(m.rPeak),
  117. BytesRem: m.tBytes - m.bytes,
  118. Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
  119. }
  120. if s.BytesRem < 0 {
  121. s.BytesRem = 0
  122. }
  123. if s.Duration > 0 {
  124. rAvg := float64(s.Bytes) / s.Duration.Seconds()
  125. s.AvgRate = round(rAvg)
  126. if s.Active {
  127. s.InstRate = round(m.rSample)
  128. s.CurRate = round(m.rEMA)
  129. if s.BytesRem > 0 {
  130. if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
  131. ns := float64(s.BytesRem) / tRate * 1e9
  132. if ns > float64(timeRemLimit) {
  133. ns = float64(timeRemLimit)
  134. }
  135. s.TimeRem = clockRound(time.Duration(ns))
  136. }
  137. }
  138. }
  139. }
  140. m.mu.Unlock()
  141. return s
  142. }
  143. // Limit restricts the instantaneous (per-sample) data flow to rate bytes per
  144. // second. It returns the maximum number of bytes (0 <= n <= want) that may be
  145. // transferred immediately without exceeding the limit. If block == true, the
  146. // call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
  147. // or the transfer is inactive (after a call to Done).
  148. //
  149. // At least one byte is always allowed to be transferred in any given sampling
  150. // period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
  151. // is 10 bytes per second.
  152. //
  153. // For usage examples, see the implementation of Reader and Writer in io.go.
  154. func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
  155. if want < 1 || rate < 1 {
  156. return want
  157. }
  158. m.mu.Lock()
  159. // Determine the maximum number of bytes that can be sent in one sample
  160. limit := round(float64(rate) * m.sRate.Seconds())
  161. if limit <= 0 {
  162. limit = 1
  163. }
  164. // If block == true, wait until m.sBytes < limit
  165. if now := m.update(0); block {
  166. for m.sBytes >= limit && m.active {
  167. now = m.waitNextSample(now)
  168. }
  169. }
  170. // Make limit <= want (unlimited if the transfer is no longer active)
  171. if limit -= m.sBytes; limit > int64(want) || !m.active {
  172. limit = int64(want)
  173. }
  174. m.mu.Unlock()
  175. if limit < 0 {
  176. limit = 0
  177. }
  178. return int(limit)
  179. }
  180. // SetTransferSize specifies the total size of the data transfer, which allows
  181. // the Monitor to calculate the overall progress and time to completion.
  182. func (m *Monitor) SetTransferSize(bytes int64) {
  183. if bytes < 0 {
  184. bytes = 0
  185. }
  186. m.mu.Lock()
  187. m.tBytes = bytes
  188. m.mu.Unlock()
  189. }
  190. // update accumulates the transferred byte count for the current sample until
  191. // clock() - m.sLast >= m.sRate. The monitor status is updated once the current
  192. // sample is done.
  193. func (m *Monitor) update(n int) (now time.Duration) {
  194. if !m.active {
  195. return
  196. }
  197. if now = clock(); n > 0 {
  198. m.tLast = now
  199. }
  200. m.sBytes += int64(n)
  201. if sTime := now - m.sLast; sTime >= m.sRate {
  202. t := sTime.Seconds()
  203. if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
  204. m.rPeak = m.rSample
  205. }
  206. // Exponential moving average using a method similar to *nix load
  207. // average calculation. Longer sampling periods carry greater weight.
  208. if m.samples > 0 {
  209. w := math.Exp(-t / m.rWindow)
  210. m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
  211. } else {
  212. m.rEMA = m.rSample
  213. }
  214. m.reset(now)
  215. }
  216. return
  217. }
  218. // reset clears the current sample state in preparation for the next sample.
  219. func (m *Monitor) reset(sampleTime time.Duration) {
  220. m.bytes += m.sBytes
  221. m.samples++
  222. m.sBytes = 0
  223. m.sLast = sampleTime
  224. }
  225. // waitNextSample sleeps for the remainder of the current sample. The lock is
  226. // released and reacquired during the actual sleep period, so it's possible for
  227. // the transfer to be inactive when this method returns.
  228. func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
  229. const minWait = 5 * time.Millisecond
  230. current := m.sLast
  231. // sleep until the last sample time changes (ideally, just one iteration)
  232. for m.sLast == current && m.active {
  233. d := current + m.sRate - now
  234. m.mu.Unlock()
  235. if d < minWait {
  236. d = minWait
  237. }
  238. time.Sleep(d)
  239. m.mu.Lock()
  240. now = m.update(0)
  241. }
  242. return now
  243. }