bounded_frequency_runner.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package async
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "k8s.io/client-go/util/flowcontrol"
  19. "k8s.io/klog"
  20. )
  21. // BoundedFrequencyRunner manages runs of a user-provided function.
  22. // See NewBoundedFrequencyRunner for examples.
  23. type BoundedFrequencyRunner struct {
  24. name string // the name of this instance
  25. minInterval time.Duration // the min time between runs, modulo bursts
  26. maxInterval time.Duration // the max time between runs
  27. run chan struct{} // try an async run
  28. mu sync.Mutex // guards runs of fn and all mutations
  29. fn func() // function to run
  30. lastRun time.Time // time of last run
  31. timer timer // timer for deferred runs
  32. limiter rateLimiter // rate limiter for on-demand runs
  33. }
  34. // designed so that flowcontrol.RateLimiter satisfies
  35. type rateLimiter interface {
  36. TryAccept() bool
  37. Stop()
  38. }
  39. type nullLimiter struct{}
  40. func (nullLimiter) TryAccept() bool {
  41. return true
  42. }
  43. func (nullLimiter) Stop() {}
  44. var _ rateLimiter = nullLimiter{}
  45. // for testing
  46. type timer interface {
  47. // C returns the timer's selectable channel.
  48. C() <-chan time.Time
  49. // See time.Timer.Reset.
  50. Reset(d time.Duration) bool
  51. // See time.Timer.Stop.
  52. Stop() bool
  53. // See time.Now.
  54. Now() time.Time
  55. // See time.Since.
  56. Since(t time.Time) time.Duration
  57. // See time.Sleep.
  58. Sleep(d time.Duration)
  59. }
  60. // implement our timer in terms of std time.Timer.
  61. type realTimer struct {
  62. *time.Timer
  63. }
  64. func (rt realTimer) C() <-chan time.Time {
  65. return rt.Timer.C
  66. }
  67. func (rt realTimer) Now() time.Time {
  68. return time.Now()
  69. }
  70. func (rt realTimer) Since(t time.Time) time.Duration {
  71. return time.Since(t)
  72. }
  73. func (rt realTimer) Sleep(d time.Duration) {
  74. time.Sleep(d)
  75. }
  76. var _ timer = realTimer{}
  77. // NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance,
  78. // which will manage runs of the specified function.
  79. //
  80. // All runs will be async to the caller of BoundedFrequencyRunner.Run, but
  81. // multiple runs are serialized. If the function needs to hold locks, it must
  82. // take them internally.
  83. //
  84. // Runs of the function will have at least minInterval between them (from
  85. // completion to next start), except that up to bursts may be allowed. Burst
  86. // runs are "accumulated" over time, one per minInterval up to burstRuns total.
  87. // This can be used, for example, to mitigate the impact of expensive operations
  88. // being called in response to user-initiated operations. Run requests that
  89. // would violate the minInterval are coallesced and run at the next opportunity.
  90. //
  91. // The function will be run at least once per maxInterval. For example, this can
  92. // force periodic refreshes of state in the absence of anyone calling Run.
  93. //
  94. // Examples:
  95. //
  96. // NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1)
  97. // - fn will have at least 1 second between runs
  98. // - fn will have no more than 5 seconds between runs
  99. //
  100. // NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3)
  101. // - fn will have at least 3 seconds between runs, with up to 3 burst runs
  102. // - fn will have no more than 10 seconds between runs
  103. //
  104. // The maxInterval must be greater than or equal to the minInterval, If the
  105. // caller passes a maxInterval less than minInterval, this function will panic.
  106. func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
  107. timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately
  108. <-timer.C() // consume the first tick
  109. return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
  110. }
  111. // Make an instance with dependencies injected.
  112. func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
  113. if maxInterval < minInterval {
  114. panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, minInterval, maxInterval))
  115. }
  116. if timer == nil {
  117. panic(fmt.Sprintf("%s: timer must be non-nil", name))
  118. }
  119. bfr := &BoundedFrequencyRunner{
  120. name: name,
  121. fn: fn,
  122. minInterval: minInterval,
  123. maxInterval: maxInterval,
  124. run: make(chan struct{}, 1),
  125. timer: timer,
  126. }
  127. if minInterval == 0 {
  128. bfr.limiter = nullLimiter{}
  129. } else {
  130. // allow burst updates in short succession
  131. qps := float32(time.Second) / float32(minInterval)
  132. bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
  133. }
  134. return bfr
  135. }
  136. // Loop handles the periodic timer and run requests. This is expected to be
  137. // called as a goroutine.
  138. func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
  139. klog.V(3).Infof("%s Loop running", bfr.name)
  140. bfr.timer.Reset(bfr.maxInterval)
  141. for {
  142. select {
  143. case <-stop:
  144. bfr.stop()
  145. klog.V(3).Infof("%s Loop stopping", bfr.name)
  146. return
  147. case <-bfr.timer.C():
  148. bfr.tryRun()
  149. case <-bfr.run:
  150. bfr.tryRun()
  151. }
  152. }
  153. }
  154. // Run the function as soon as possible. If this is called while Loop is not
  155. // running, the call may be deferred indefinitely.
  156. // If there is already a queued request to call the underlying function, it
  157. // may be dropped - it is just guaranteed that we will try calling the
  158. // underlying function as soon as possible starting from now.
  159. func (bfr *BoundedFrequencyRunner) Run() {
  160. // If it takes a lot of time to run the underlying function, noone is really
  161. // processing elements from <run> channel. So to avoid blocking here on the
  162. // putting element to it, we simply skip it if there is already an element
  163. // in it.
  164. select {
  165. case bfr.run <- struct{}{}:
  166. default:
  167. }
  168. }
  169. // assumes the lock is not held
  170. func (bfr *BoundedFrequencyRunner) stop() {
  171. bfr.mu.Lock()
  172. defer bfr.mu.Unlock()
  173. bfr.limiter.Stop()
  174. bfr.timer.Stop()
  175. }
  176. // assumes the lock is not held
  177. func (bfr *BoundedFrequencyRunner) tryRun() {
  178. bfr.mu.Lock()
  179. defer bfr.mu.Unlock()
  180. if bfr.limiter.TryAccept() {
  181. // We're allowed to run the function right now.
  182. bfr.fn()
  183. bfr.lastRun = bfr.timer.Now()
  184. bfr.timer.Stop()
  185. bfr.timer.Reset(bfr.maxInterval)
  186. klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
  187. return
  188. }
  189. // It can't run right now, figure out when it can run next.
  190. elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
  191. nextPossible := bfr.minInterval - elapsed // time to next possible run
  192. nextScheduled := bfr.maxInterval - elapsed // time to next periodic run
  193. klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
  194. if nextPossible < nextScheduled {
  195. // Set the timer for ASAP, but don't drain here. Assuming Loop is running,
  196. // it might get a delivery in the mean time, but that is OK.
  197. bfr.timer.Stop()
  198. bfr.timer.Reset(nextPossible)
  199. klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
  200. }
  201. }