bounded_frequency_runner_test.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package async
  14. import (
  15. "sync"
  16. "testing"
  17. "time"
  18. )
  19. // Track calls to the managed function.
  20. type receiver struct {
  21. lock sync.Mutex
  22. run bool
  23. }
  24. func (r *receiver) F() {
  25. r.lock.Lock()
  26. defer r.lock.Unlock()
  27. r.run = true
  28. }
  29. func (r *receiver) reset() bool {
  30. r.lock.Lock()
  31. defer r.lock.Unlock()
  32. was := r.run
  33. r.run = false
  34. return was
  35. }
  36. // A single change event in the fake timer.
  37. type timerUpdate struct {
  38. active bool
  39. next time.Duration // iff active == true
  40. }
  41. // Fake time.
  42. type fakeTimer struct {
  43. c chan time.Time
  44. lock sync.Mutex
  45. now time.Time
  46. active bool
  47. updated chan timerUpdate
  48. }
  49. func newFakeTimer() *fakeTimer {
  50. ft := &fakeTimer{
  51. now: time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC),
  52. c: make(chan time.Time),
  53. updated: make(chan timerUpdate),
  54. }
  55. return ft
  56. }
  57. func (ft *fakeTimer) C() <-chan time.Time {
  58. return ft.c
  59. }
  60. func (ft *fakeTimer) Reset(in time.Duration) bool {
  61. ft.lock.Lock()
  62. defer ft.lock.Unlock()
  63. was := ft.active
  64. ft.active = true
  65. ft.updated <- timerUpdate{
  66. active: true,
  67. next: in,
  68. }
  69. return was
  70. }
  71. func (ft *fakeTimer) Stop() bool {
  72. ft.lock.Lock()
  73. defer ft.lock.Unlock()
  74. was := ft.active
  75. ft.active = false
  76. ft.updated <- timerUpdate{
  77. active: false,
  78. }
  79. return was
  80. }
  81. func (ft *fakeTimer) Now() time.Time {
  82. ft.lock.Lock()
  83. defer ft.lock.Unlock()
  84. return ft.now
  85. }
  86. func (ft *fakeTimer) Since(t time.Time) time.Duration {
  87. ft.lock.Lock()
  88. defer ft.lock.Unlock()
  89. return ft.now.Sub(t)
  90. }
  91. func (ft *fakeTimer) Sleep(d time.Duration) {
  92. ft.lock.Lock()
  93. defer ft.lock.Unlock()
  94. ft.advance(d)
  95. }
  96. // advance the current time.
  97. func (ft *fakeTimer) advance(d time.Duration) {
  98. ft.lock.Lock()
  99. defer ft.lock.Unlock()
  100. ft.now = ft.now.Add(d)
  101. }
  102. // send a timer tick.
  103. func (ft *fakeTimer) tick() {
  104. ft.lock.Lock()
  105. defer ft.lock.Unlock()
  106. ft.active = false
  107. ft.c <- ft.now
  108. }
  109. // return the calling line number (for printing)
  110. // test the timer's state
  111. func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
  112. if upd.active != active {
  113. t.Fatalf("%s: expected timer active=%v", name, active)
  114. }
  115. if active && upd.next != next {
  116. t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
  117. }
  118. }
  119. // test and reset the receiver's state
  120. func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
  121. triggered := receiver.reset()
  122. if expected && !triggered {
  123. t.Fatalf("%s: function should have been called", name)
  124. } else if !expected && triggered {
  125. t.Fatalf("%s: function should not have been called", name)
  126. }
  127. }
  128. // Durations embedded in test cases depend on these.
  129. var minInterval = 1 * time.Second
  130. var maxInterval = 10 * time.Second
  131. func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
  132. upd := <-timer.updated // wait for stop
  133. checkReceiver(name, t, obj, expectCall)
  134. checkReceiver(name, t, obj, false) // prove post-condition
  135. checkTimer(name, t, upd, false, 0)
  136. upd = <-timer.updated // wait for reset
  137. checkTimer(name, t, upd, true, expectNext)
  138. }
  139. func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
  140. waitForReset(name, t, timer, obj, true, maxInterval)
  141. }
  142. func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
  143. waitForReset(name, t, timer, obj, false, expectNext)
  144. }
  145. func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
  146. obj := &receiver{}
  147. timer := newFakeTimer()
  148. runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
  149. stop := make(chan struct{})
  150. var upd timerUpdate
  151. // Start.
  152. go runner.Loop(stop)
  153. upd = <-timer.updated // wait for initial time to be set to max
  154. checkTimer("init", t, upd, true, maxInterval)
  155. checkReceiver("init", t, obj, false)
  156. // Run once, immediately.
  157. // rel=0ms
  158. runner.Run()
  159. waitForRun("first run", t, timer, obj)
  160. // Run again, before minInterval expires.
  161. timer.advance(500 * time.Millisecond) // rel=500ms
  162. runner.Run()
  163. waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
  164. // Run again, before minInterval expires.
  165. timer.advance(499 * time.Millisecond) // rel=999ms
  166. runner.Run()
  167. waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
  168. // Run again, once minInterval has passed (race with timer).
  169. timer.advance(1 * time.Millisecond) // rel=1000ms
  170. runner.Run()
  171. waitForRun("second run", t, timer, obj)
  172. // Run again, before minInterval expires.
  173. // rel=0ms
  174. runner.Run()
  175. waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
  176. // Run again, before minInterval expires.
  177. timer.advance(1 * time.Millisecond) // rel=1ms
  178. runner.Run()
  179. waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
  180. // Let the timer tick prematurely.
  181. timer.advance(998 * time.Millisecond) // rel=999ms
  182. timer.tick()
  183. waitForDefer("premature tick", t, timer, obj, 1*time.Millisecond)
  184. // Let the timer tick.
  185. timer.advance(1 * time.Millisecond) // rel=1000ms
  186. timer.tick()
  187. waitForRun("first tick", t, timer, obj)
  188. // Let the timer tick.
  189. timer.advance(10 * time.Second) // rel=10000ms
  190. timer.tick()
  191. waitForRun("second tick", t, timer, obj)
  192. // Run again, before minInterval expires.
  193. timer.advance(1 * time.Millisecond) // rel=1ms
  194. runner.Run()
  195. waitForDefer("too soon after tick", t, timer, obj, 999*time.Millisecond)
  196. // Let the timer tick.
  197. timer.advance(999 * time.Millisecond) // rel=1000ms
  198. timer.tick()
  199. waitForRun("third tick", t, timer, obj)
  200. // Clean up.
  201. stop <- struct{}{}
  202. }
  203. func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
  204. obj := &receiver{}
  205. timer := newFakeTimer()
  206. runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
  207. stop := make(chan struct{})
  208. var upd timerUpdate
  209. // Start.
  210. go runner.Loop(stop)
  211. upd = <-timer.updated // wait for initial time to be set to max
  212. checkTimer("init", t, upd, true, maxInterval)
  213. checkReceiver("init", t, obj, false)
  214. // Run once, immediately.
  215. // abs=0ms, rel=0ms
  216. runner.Run()
  217. waitForRun("first run", t, timer, obj)
  218. // Run again, before minInterval expires, with burst.
  219. timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
  220. runner.Run()
  221. waitForRun("second run", t, timer, obj)
  222. // Run again, before minInterval expires.
  223. timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
  224. runner.Run()
  225. waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
  226. // Run again, before minInterval expires.
  227. timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
  228. runner.Run()
  229. waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
  230. // Run again, before minInterval expires.
  231. timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
  232. runner.Run()
  233. waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
  234. // Run again, once burst has replenished.
  235. timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
  236. runner.Run()
  237. waitForRun("third run", t, timer, obj)
  238. // Run again, before minInterval expires.
  239. timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
  240. runner.Run()
  241. waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
  242. // Run again, before minInterval expires.
  243. timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
  244. runner.Run()
  245. waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
  246. // Run again, once burst has replenished.
  247. timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
  248. runner.Run()
  249. waitForRun("fourth run", t, timer, obj)
  250. // Run again, once burst has fully replenished.
  251. timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
  252. runner.Run()
  253. waitForRun("fifth run", t, timer, obj)
  254. runner.Run()
  255. waitForRun("sixth run", t, timer, obj)
  256. runner.Run()
  257. waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
  258. // Let the timer tick.
  259. timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
  260. timer.tick()
  261. waitForRun("first tick", t, timer, obj)
  262. // Let the timer tick.
  263. timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
  264. timer.tick()
  265. waitForRun("second tick", t, timer, obj)
  266. // Clean up.
  267. stop <- struct{}{}
  268. }