bounded_frequency_runner.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package async
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "k8s.io/client-go/util/flowcontrol"
  19. "k8s.io/klog"
  20. )
  21. // BoundedFrequencyRunner manages runs of a user-provided function.
  22. // See NewBoundedFrequencyRunner for examples.
  23. type BoundedFrequencyRunner struct {
  24. name string // the name of this instance
  25. minInterval time.Duration // the min time between runs, modulo bursts
  26. maxInterval time.Duration // the max time between runs
  27. run chan struct{} // try an async run
  28. mu sync.Mutex // guards runs of fn and all mutations
  29. fn func() // function to run
  30. lastRun time.Time // time of last run
  31. timer timer // timer for deferred runs
  32. limiter rateLimiter // rate limiter for on-demand runs
  33. retry chan struct{} // schedule a retry
  34. retryMu sync.Mutex // guards retryTime
  35. retryTime time.Time // when to retry
  36. }
  37. // designed so that flowcontrol.RateLimiter satisfies
  38. type rateLimiter interface {
  39. TryAccept() bool
  40. Stop()
  41. }
  42. type nullLimiter struct{}
  43. func (nullLimiter) TryAccept() bool {
  44. return true
  45. }
  46. func (nullLimiter) Stop() {}
  47. var _ rateLimiter = nullLimiter{}
  48. // for testing
  49. type timer interface {
  50. // C returns the timer's selectable channel.
  51. C() <-chan time.Time
  52. // See time.Timer.Reset.
  53. Reset(d time.Duration) bool
  54. // See time.Timer.Stop.
  55. Stop() bool
  56. // See time.Now.
  57. Now() time.Time
  58. // Remaining returns the time until the timer will go off (if it is running).
  59. Remaining() time.Duration
  60. // See time.Since.
  61. Since(t time.Time) time.Duration
  62. // See time.Sleep.
  63. Sleep(d time.Duration)
  64. }
  65. // implement our timer in terms of std time.Timer.
  66. type realTimer struct {
  67. timer *time.Timer
  68. next time.Time
  69. }
  70. func (rt *realTimer) C() <-chan time.Time {
  71. return rt.timer.C
  72. }
  73. func (rt *realTimer) Reset(d time.Duration) bool {
  74. rt.next = time.Now().Add(d)
  75. return rt.timer.Reset(d)
  76. }
  77. func (rt *realTimer) Stop() bool {
  78. return rt.timer.Stop()
  79. }
  80. func (rt *realTimer) Now() time.Time {
  81. return time.Now()
  82. }
  83. func (rt *realTimer) Remaining() time.Duration {
  84. return rt.next.Sub(time.Now())
  85. }
  86. func (rt *realTimer) Since(t time.Time) time.Duration {
  87. return time.Since(t)
  88. }
  89. func (rt *realTimer) Sleep(d time.Duration) {
  90. time.Sleep(d)
  91. }
  92. var _ timer = &realTimer{}
  93. // NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance,
  94. // which will manage runs of the specified function.
  95. //
  96. // All runs will be async to the caller of BoundedFrequencyRunner.Run, but
  97. // multiple runs are serialized. If the function needs to hold locks, it must
  98. // take them internally.
  99. //
  100. // Runs of the function will have at least minInterval between them (from
  101. // completion to next start), except that up to bursts may be allowed. Burst
  102. // runs are "accumulated" over time, one per minInterval up to burstRuns total.
  103. // This can be used, for example, to mitigate the impact of expensive operations
  104. // being called in response to user-initiated operations. Run requests that
  105. // would violate the minInterval are coallesced and run at the next opportunity.
  106. //
  107. // The function will be run at least once per maxInterval. For example, this can
  108. // force periodic refreshes of state in the absence of anyone calling Run.
  109. //
  110. // Examples:
  111. //
  112. // NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1)
  113. // - fn will have at least 1 second between runs
  114. // - fn will have no more than 5 seconds between runs
  115. //
  116. // NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3)
  117. // - fn will have at least 3 seconds between runs, with up to 3 burst runs
  118. // - fn will have no more than 10 seconds between runs
  119. //
  120. // The maxInterval must be greater than or equal to the minInterval, If the
  121. // caller passes a maxInterval less than minInterval, this function will panic.
  122. func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
  123. timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
  124. <-timer.C() // consume the first tick
  125. return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
  126. }
  127. // Make an instance with dependencies injected.
  128. func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
  129. if maxInterval < minInterval {
  130. panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
  131. }
  132. if timer == nil {
  133. panic(fmt.Sprintf("%s: timer must be non-nil", name))
  134. }
  135. bfr := &BoundedFrequencyRunner{
  136. name: name,
  137. fn: fn,
  138. minInterval: minInterval,
  139. maxInterval: maxInterval,
  140. run: make(chan struct{}, 1),
  141. retry: make(chan struct{}, 1),
  142. timer: timer,
  143. }
  144. if minInterval == 0 {
  145. bfr.limiter = nullLimiter{}
  146. } else {
  147. // allow burst updates in short succession
  148. qps := float32(time.Second) / float32(minInterval)
  149. bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
  150. }
  151. return bfr
  152. }
  153. // Loop handles the periodic timer and run requests. This is expected to be
  154. // called as a goroutine.
  155. func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
  156. klog.V(3).Infof("%s Loop running", bfr.name)
  157. bfr.timer.Reset(bfr.maxInterval)
  158. for {
  159. select {
  160. case <-stop:
  161. bfr.stop()
  162. klog.V(3).Infof("%s Loop stopping", bfr.name)
  163. return
  164. case <-bfr.timer.C():
  165. bfr.tryRun()
  166. case <-bfr.run:
  167. bfr.tryRun()
  168. case <-bfr.retry:
  169. bfr.doRetry()
  170. }
  171. }
  172. }
  173. // Run the function as soon as possible. If this is called while Loop is not
  174. // running, the call may be deferred indefinitely.
  175. // If there is already a queued request to call the underlying function, it
  176. // may be dropped - it is just guaranteed that we will try calling the
  177. // underlying function as soon as possible starting from now.
  178. func (bfr *BoundedFrequencyRunner) Run() {
  179. // If it takes a lot of time to run the underlying function, noone is really
  180. // processing elements from <run> channel. So to avoid blocking here on the
  181. // putting element to it, we simply skip it if there is already an element
  182. // in it.
  183. select {
  184. case bfr.run <- struct{}{}:
  185. default:
  186. }
  187. }
  188. // RetryAfter ensures that the function will run again after no later than interval. This
  189. // can be called from inside a run of the BoundedFrequencyRunner's function, or
  190. // asynchronously.
  191. func (bfr *BoundedFrequencyRunner) RetryAfter(interval time.Duration) {
  192. // This could be called either with or without bfr.mu held, so we can't grab that
  193. // lock, and therefore we can't update the timer directly.
  194. // If the Loop thread is currently running fn then it may be a while before it
  195. // processes our retry request. But we want to retry at interval from now, not at
  196. // interval from "whenever doRetry eventually gets called". So we convert to
  197. // absolute time.
  198. retryTime := bfr.timer.Now().Add(interval)
  199. // We can't just write retryTime to a channel because there could be multiple
  200. // RetryAfter calls before Loop gets a chance to read from the channel. So we
  201. // record the soonest requested retry time in bfr.retryTime and then only signal
  202. // the Loop thread once, just like Run does.
  203. bfr.retryMu.Lock()
  204. defer bfr.retryMu.Unlock()
  205. if !bfr.retryTime.IsZero() && bfr.retryTime.Before(retryTime) {
  206. return
  207. }
  208. bfr.retryTime = retryTime
  209. select {
  210. case bfr.retry <- struct{}{}:
  211. default:
  212. }
  213. }
  214. // assumes the lock is not held
  215. func (bfr *BoundedFrequencyRunner) stop() {
  216. bfr.mu.Lock()
  217. defer bfr.mu.Unlock()
  218. bfr.limiter.Stop()
  219. bfr.timer.Stop()
  220. }
  221. // assumes the lock is not held
  222. func (bfr *BoundedFrequencyRunner) doRetry() {
  223. bfr.mu.Lock()
  224. defer bfr.mu.Unlock()
  225. bfr.retryMu.Lock()
  226. defer bfr.retryMu.Unlock()
  227. if bfr.retryTime.IsZero() {
  228. return
  229. }
  230. // Timer wants an interval not an absolute time, so convert retryTime back now
  231. retryInterval := bfr.retryTime.Sub(bfr.timer.Now())
  232. bfr.retryTime = time.Time{}
  233. if retryInterval < bfr.timer.Remaining() {
  234. klog.V(3).Infof("%s: retrying in %v", bfr.name, retryInterval)
  235. bfr.timer.Stop()
  236. bfr.timer.Reset(retryInterval)
  237. }
  238. }
  239. // assumes the lock is not held
  240. func (bfr *BoundedFrequencyRunner) tryRun() {
  241. bfr.mu.Lock()
  242. defer bfr.mu.Unlock()
  243. if bfr.limiter.TryAccept() {
  244. // We're allowed to run the function right now.
  245. bfr.fn()
  246. bfr.lastRun = bfr.timer.Now()
  247. bfr.timer.Stop()
  248. bfr.timer.Reset(bfr.maxInterval)
  249. klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
  250. return
  251. }
  252. // It can't run right now, figure out when it can run next.
  253. elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
  254. nextPossible := bfr.minInterval - elapsed // time to next possible run
  255. nextScheduled := bfr.timer.Remaining() // time to next scheduled run
  256. klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
  257. // It's hard to avoid race conditions in the unit tests unless we always reset
  258. // the timer here, even when it's unchanged
  259. if nextPossible < nextScheduled {
  260. nextScheduled = nextPossible
  261. }
  262. bfr.timer.Stop()
  263. bfr.timer.Reset(nextScheduled)
  264. }