bounded_frequency_runner_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package async
  14. import (
  15. "sync"
  16. "testing"
  17. "time"
  18. )
  19. // Track calls to the managed function.
  20. type receiver struct {
  21. lock sync.Mutex
  22. run bool
  23. retryFn func()
  24. }
  25. func (r *receiver) F() {
  26. r.lock.Lock()
  27. defer r.lock.Unlock()
  28. r.run = true
  29. if r.retryFn != nil {
  30. r.retryFn()
  31. r.retryFn = nil
  32. }
  33. }
  34. func (r *receiver) reset() bool {
  35. r.lock.Lock()
  36. defer r.lock.Unlock()
  37. was := r.run
  38. r.run = false
  39. return was
  40. }
  41. func (r *receiver) setRetryFn(retryFn func()) {
  42. r.lock.Lock()
  43. defer r.lock.Unlock()
  44. r.retryFn = retryFn
  45. }
  46. // A single change event in the fake timer.
  47. type timerUpdate struct {
  48. active bool
  49. next time.Duration // iff active == true
  50. }
  51. // Fake time.
  52. type fakeTimer struct {
  53. c chan time.Time
  54. lock sync.Mutex
  55. now time.Time
  56. timeout time.Time
  57. active bool
  58. updated chan timerUpdate
  59. }
  60. func newFakeTimer() *fakeTimer {
  61. ft := &fakeTimer{
  62. now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
  63. c: make(chan time.Time),
  64. updated: make(chan timerUpdate),
  65. }
  66. return ft
  67. }
  68. func (ft *fakeTimer) C() <-chan time.Time {
  69. return ft.c
  70. }
  71. func (ft *fakeTimer) Reset(in time.Duration) bool {
  72. ft.lock.Lock()
  73. defer ft.lock.Unlock()
  74. was := ft.active
  75. ft.active = true
  76. ft.timeout = ft.now.Add(in)
  77. ft.updated <- timerUpdate{
  78. active: true,
  79. next: in,
  80. }
  81. return was
  82. }
  83. func (ft *fakeTimer) Stop() bool {
  84. ft.lock.Lock()
  85. defer ft.lock.Unlock()
  86. was := ft.active
  87. ft.active = false
  88. ft.updated <- timerUpdate{
  89. active: false,
  90. }
  91. return was
  92. }
  93. func (ft *fakeTimer) Now() time.Time {
  94. ft.lock.Lock()
  95. defer ft.lock.Unlock()
  96. return ft.now
  97. }
  98. func (ft *fakeTimer) Remaining() time.Duration {
  99. ft.lock.Lock()
  100. defer ft.lock.Unlock()
  101. return ft.timeout.Sub(ft.now)
  102. }
  103. func (ft *fakeTimer) Since(t time.Time) time.Duration {
  104. ft.lock.Lock()
  105. defer ft.lock.Unlock()
  106. return ft.now.Sub(t)
  107. }
  108. func (ft *fakeTimer) Sleep(d time.Duration) {
  109. // ft.advance grabs ft.lock
  110. ft.advance(d)
  111. }
  112. // advance the current time.
  113. func (ft *fakeTimer) advance(d time.Duration) {
  114. ft.lock.Lock()
  115. defer ft.lock.Unlock()
  116. ft.now = ft.now.Add(d)
  117. if ft.active && !ft.now.Before(ft.timeout) {
  118. ft.active = false
  119. ft.c <- ft.timeout
  120. }
  121. }
  122. // return the calling line number (for printing)
  123. // test the timer's state
  124. func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
  125. if upd.active != active {
  126. t.Fatalf("%s: expected timer active=%v", name, active)
  127. }
  128. if active && upd.next != next {
  129. t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
  130. }
  131. }
  132. // test and reset the receiver's state
  133. func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
  134. triggered := receiver.reset()
  135. if expected && !triggered {
  136. t.Fatalf("%s: function should have been called", name)
  137. } else if !expected && triggered {
  138. t.Fatalf("%s: function should not have been called", name)
  139. }
  140. }
  141. // Durations embedded in test cases depend on these.
  142. var minInterval = 1 * time.Second
  143. var maxInterval = 10 * time.Second
  144. func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
  145. upd := <-timer.updated // wait for stop
  146. checkReceiver(name, t, obj, expectCall)
  147. checkReceiver(name, t, obj, false) // prove post-condition
  148. checkTimer(name, t, upd, false, 0)
  149. upd = <-timer.updated // wait for reset
  150. checkTimer(name, t, upd, true, expectNext)
  151. }
  152. func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
  153. waitForReset(name, t, timer, obj, true, maxInterval)
  154. }
  155. func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
  156. // It will first get reset as with a normal run, and then get set again
  157. waitForRun(name, t, timer, obj)
  158. waitForReset(name, t, timer, obj, false, expectNext)
  159. }
  160. func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
  161. waitForReset(name, t, timer, obj, false, expectNext)
  162. }
  163. func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
  164. select {
  165. case <-timer.c:
  166. t.Fatalf("%s: unexpected timer tick", name)
  167. case upd := <-timer.updated:
  168. t.Fatalf("%s: unexpected timer update %v", name, upd)
  169. default:
  170. }
  171. checkReceiver(name, t, obj, false)
  172. }
  173. func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
  174. obj := &receiver{}
  175. timer := newFakeTimer()
  176. runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
  177. stop := make(chan struct{})
  178. var upd timerUpdate
  179. // Start.
  180. go runner.Loop(stop)
  181. upd = <-timer.updated // wait for initial time to be set to max
  182. checkTimer("init", t, upd, true, maxInterval)
  183. checkReceiver("init", t, obj, false)
  184. // Run once, immediately.
  185. // rel=0ms
  186. runner.Run()
  187. waitForRun("first run", t, timer, obj)
  188. // Run again, before minInterval expires.
  189. timer.advance(500 * time.Millisecond) // rel=500ms
  190. runner.Run()
  191. waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
  192. // Run again, before minInterval expires.
  193. timer.advance(499 * time.Millisecond) // rel=999ms
  194. runner.Run()
  195. waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
  196. // Do the deferred run
  197. timer.advance(1 * time.Millisecond) // rel=1000ms
  198. waitForRun("second run", t, timer, obj)
  199. // Try again immediately
  200. runner.Run()
  201. waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
  202. // Run again, before minInterval expires.
  203. timer.advance(1 * time.Millisecond) // rel=1ms
  204. runner.Run()
  205. waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
  206. // Ensure that we don't run again early
  207. timer.advance(998 * time.Millisecond) // rel=999ms
  208. waitForNothing("premature", t, timer, obj)
  209. // Do the deferred run
  210. timer.advance(1 * time.Millisecond) // rel=1000ms
  211. waitForRun("third run", t, timer, obj)
  212. // Let minInterval pass, but there are no runs queued
  213. timer.advance(1 * time.Second) // rel=1000ms
  214. waitForNothing("minInterval", t, timer, obj)
  215. // Let maxInterval pass
  216. timer.advance(9 * time.Second) // rel=10000ms
  217. waitForRun("maxInterval", t, timer, obj)
  218. // Run again, before minInterval expires.
  219. timer.advance(1 * time.Millisecond) // rel=1ms
  220. runner.Run()
  221. waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)
  222. // Let minInterval pass
  223. timer.advance(999 * time.Millisecond) // rel=1000ms
  224. waitForRun("fourth run", t, timer, obj)
  225. // Clean up.
  226. stop <- struct{}{}
  227. }
  228. func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
  229. obj := &receiver{}
  230. timer := newFakeTimer()
  231. runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
  232. stop := make(chan struct{})
  233. var upd timerUpdate
  234. // Start.
  235. go runner.Loop(stop)
  236. upd = <-timer.updated // wait for initial time to be set to max
  237. checkTimer("init", t, upd, true, maxInterval)
  238. checkReceiver("init", t, obj, false)
  239. // Run once, immediately.
  240. // abs=0ms, rel=0ms
  241. runner.Run()
  242. waitForRun("first run", t, timer, obj)
  243. // Run again, before minInterval expires, with burst.
  244. timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
  245. runner.Run()
  246. waitForRun("second run", t, timer, obj)
  247. // Run again, before minInterval expires.
  248. timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
  249. runner.Run()
  250. waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
  251. // Run again, before minInterval expires.
  252. timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
  253. runner.Run()
  254. waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
  255. // Run again, before minInterval expires.
  256. timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
  257. runner.Run()
  258. waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
  259. // Advance timer enough to replenish bursts, but not enough to be minInterval
  260. // after the last run
  261. timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
  262. waitForNothing("not minInterval", t, timer, obj)
  263. runner.Run()
  264. waitForRun("third run", t, timer, obj)
  265. // Run again, before minInterval expires.
  266. timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
  267. runner.Run()
  268. waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
  269. // Run again, before minInterval expires.
  270. timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
  271. runner.Run()
  272. waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
  273. // Advance and do the deferred run
  274. timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
  275. waitForRun("fourth run", t, timer, obj)
  276. // Run again, once burst has fully replenished.
  277. timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
  278. runner.Run()
  279. waitForRun("fifth run", t, timer, obj)
  280. runner.Run()
  281. waitForRun("sixth run", t, timer, obj)
  282. runner.Run()
  283. waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
  284. // Wait until minInterval after the last run
  285. timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
  286. waitForRun("seventh run", t, timer, obj)
  287. // Wait for maxInterval
  288. timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
  289. waitForRun("maxInterval", t, timer, obj)
  290. // Clean up.
  291. stop <- struct{}{}
  292. }
  293. func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
  294. obj := &receiver{}
  295. timer := newFakeTimer()
  296. runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
  297. stop := make(chan struct{})
  298. var upd timerUpdate
  299. // Start.
  300. go runner.Loop(stop)
  301. upd = <-timer.updated // wait for initial time to be set to max
  302. checkTimer("init", t, upd, true, maxInterval)
  303. checkReceiver("init", t, obj, false)
  304. // Run once, immediately, and queue a retry
  305. // rel=0ms
  306. obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
  307. runner.Run()
  308. waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
  309. // Nothing happens...
  310. timer.advance(time.Second) // rel=1000ms
  311. waitForNothing("minInterval, nothing queued", t, timer, obj)
  312. // After retryInterval, function is called
  313. timer.advance(4 * time.Second) // rel=5000ms
  314. waitForRun("retry", t, timer, obj)
  315. // Run again, before minInterval expires.
  316. timer.advance(499 * time.Millisecond) // rel=499ms
  317. runner.Run()
  318. waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
  319. // Do the deferred run, queue another retry after it returns
  320. timer.advance(501 * time.Millisecond) // rel=1000ms
  321. runner.RetryAfter(5 * time.Second)
  322. waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
  323. // Wait for minInterval to pass
  324. timer.advance(time.Second) // rel=1000ms
  325. waitForNothing("minInterval, nothing queued", t, timer, obj)
  326. // Now do another run
  327. runner.Run()
  328. waitForRun("third run", t, timer, obj)
  329. // Retry was cancelled because we already ran
  330. timer.advance(4 * time.Second)
  331. waitForNothing("retry cancelled", t, timer, obj)
  332. // Run, queue a retry from a goroutine
  333. obj.setRetryFn(func() {
  334. go func() {
  335. time.Sleep(100 * time.Millisecond)
  336. runner.RetryAfter(5 * time.Second)
  337. }()
  338. })
  339. runner.Run()
  340. waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
  341. // Call Run again before minInterval passes
  342. timer.advance(100 * time.Millisecond) // rel=100ms
  343. runner.Run()
  344. waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
  345. // Deferred run will run after minInterval passes
  346. timer.advance(900 * time.Millisecond) // rel=1000ms
  347. waitForRun("fifth run", t, timer, obj)
  348. // Retry was cancelled because we already ran
  349. timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
  350. waitForNothing("retry cancelled", t, timer, obj)
  351. // Rerun happens after maxInterval
  352. timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
  353. waitForNothing("premature", t, timer, obj)
  354. timer.advance(time.Second) // rel=10s since run
  355. waitForRun("maxInterval", t, timer, obj)
  356. // Clean up.
  357. stop <- struct{}{}
  358. }