123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package async
- import (
- "sync"
- "testing"
- "time"
- )
- // Track calls to the managed function.
- type receiver struct {
- lock sync.Mutex
- run bool
- retryFn func()
- }
- func (r *receiver) F() {
- r.lock.Lock()
- defer r.lock.Unlock()
- r.run = true
- if r.retryFn != nil {
- r.retryFn()
- r.retryFn = nil
- }
- }
- func (r *receiver) reset() bool {
- r.lock.Lock()
- defer r.lock.Unlock()
- was := r.run
- r.run = false
- return was
- }
- func (r *receiver) setRetryFn(retryFn func()) {
- r.lock.Lock()
- defer r.lock.Unlock()
- r.retryFn = retryFn
- }
- // A single change event in the fake timer.
- type timerUpdate struct {
- active bool
- next time.Duration // iff active == true
- }
- // Fake time.
- type fakeTimer struct {
- c chan time.Time
- lock sync.Mutex
- now time.Time
- timeout time.Time
- active bool
- updated chan timerUpdate
- }
- func newFakeTimer() *fakeTimer {
- ft := &fakeTimer{
- now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
- c: make(chan time.Time),
- updated: make(chan timerUpdate),
- }
- return ft
- }
- func (ft *fakeTimer) C() <-chan time.Time {
- return ft.c
- }
- func (ft *fakeTimer) Reset(in time.Duration) bool {
- ft.lock.Lock()
- defer ft.lock.Unlock()
- was := ft.active
- ft.active = true
- ft.timeout = ft.now.Add(in)
- ft.updated <- timerUpdate{
- active: true,
- next: in,
- }
- return was
- }
- func (ft *fakeTimer) Stop() bool {
- ft.lock.Lock()
- defer ft.lock.Unlock()
- was := ft.active
- ft.active = false
- ft.updated <- timerUpdate{
- active: false,
- }
- return was
- }
- func (ft *fakeTimer) Now() time.Time {
- ft.lock.Lock()
- defer ft.lock.Unlock()
- return ft.now
- }
- func (ft *fakeTimer) Remaining() time.Duration {
- ft.lock.Lock()
- defer ft.lock.Unlock()
- return ft.timeout.Sub(ft.now)
- }
- func (ft *fakeTimer) Since(t time.Time) time.Duration {
- ft.lock.Lock()
- defer ft.lock.Unlock()
- return ft.now.Sub(t)
- }
- func (ft *fakeTimer) Sleep(d time.Duration) {
- // ft.advance grabs ft.lock
- ft.advance(d)
- }
- // advance the current time.
- func (ft *fakeTimer) advance(d time.Duration) {
- ft.lock.Lock()
- defer ft.lock.Unlock()
- ft.now = ft.now.Add(d)
- if ft.active && !ft.now.Before(ft.timeout) {
- ft.active = false
- ft.c <- ft.timeout
- }
- }
- // return the calling line number (for printing)
- // test the timer's state
- func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
- if upd.active != active {
- t.Fatalf("%s: expected timer active=%v", name, active)
- }
- if active && upd.next != next {
- t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
- }
- }
- // test and reset the receiver's state
- func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
- triggered := receiver.reset()
- if expected && !triggered {
- t.Fatalf("%s: function should have been called", name)
- } else if !expected && triggered {
- t.Fatalf("%s: function should not have been called", name)
- }
- }
- // Durations embedded in test cases depend on these.
- var minInterval = 1 * time.Second
- var maxInterval = 10 * time.Second
- func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
- upd := <-timer.updated // wait for stop
- checkReceiver(name, t, obj, expectCall)
- checkReceiver(name, t, obj, false) // prove post-condition
- checkTimer(name, t, upd, false, 0)
- upd = <-timer.updated // wait for reset
- checkTimer(name, t, upd, true, expectNext)
- }
- func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
- waitForReset(name, t, timer, obj, true, maxInterval)
- }
- func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
- // It will first get reset as with a normal run, and then get set again
- waitForRun(name, t, timer, obj)
- waitForReset(name, t, timer, obj, false, expectNext)
- }
- func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
- waitForReset(name, t, timer, obj, false, expectNext)
- }
- func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
- select {
- case <-timer.c:
- t.Fatalf("%s: unexpected timer tick", name)
- case upd := <-timer.updated:
- t.Fatalf("%s: unexpected timer update %v", name, upd)
- default:
- }
- checkReceiver(name, t, obj, false)
- }
- func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
- obj := &receiver{}
- timer := newFakeTimer()
- runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
- stop := make(chan struct{})
- var upd timerUpdate
- // Start.
- go runner.Loop(stop)
- upd = <-timer.updated // wait for initial time to be set to max
- checkTimer("init", t, upd, true, maxInterval)
- checkReceiver("init", t, obj, false)
- // Run once, immediately.
- // rel=0ms
- runner.Run()
- waitForRun("first run", t, timer, obj)
- // Run again, before minInterval expires.
- timer.advance(500 * time.Millisecond) // rel=500ms
- runner.Run()
- waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
- // Run again, before minInterval expires.
- timer.advance(499 * time.Millisecond) // rel=999ms
- runner.Run()
- waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
- // Do the deferred run
- timer.advance(1 * time.Millisecond) // rel=1000ms
- waitForRun("second run", t, timer, obj)
- // Try again immediately
- runner.Run()
- waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
- // Run again, before minInterval expires.
- timer.advance(1 * time.Millisecond) // rel=1ms
- runner.Run()
- waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
- // Ensure that we don't run again early
- timer.advance(998 * time.Millisecond) // rel=999ms
- waitForNothing("premature", t, timer, obj)
- // Do the deferred run
- timer.advance(1 * time.Millisecond) // rel=1000ms
- waitForRun("third run", t, timer, obj)
- // Let minInterval pass, but there are no runs queued
- timer.advance(1 * time.Second) // rel=1000ms
- waitForNothing("minInterval", t, timer, obj)
- // Let maxInterval pass
- timer.advance(9 * time.Second) // rel=10000ms
- waitForRun("maxInterval", t, timer, obj)
- // Run again, before minInterval expires.
- timer.advance(1 * time.Millisecond) // rel=1ms
- runner.Run()
- waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)
- // Let minInterval pass
- timer.advance(999 * time.Millisecond) // rel=1000ms
- waitForRun("fourth run", t, timer, obj)
- // Clean up.
- stop <- struct{}{}
- }
- func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
- obj := &receiver{}
- timer := newFakeTimer()
- runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
- stop := make(chan struct{})
- var upd timerUpdate
- // Start.
- go runner.Loop(stop)
- upd = <-timer.updated // wait for initial time to be set to max
- checkTimer("init", t, upd, true, maxInterval)
- checkReceiver("init", t, obj, false)
- // Run once, immediately.
- // abs=0ms, rel=0ms
- runner.Run()
- waitForRun("first run", t, timer, obj)
- // Run again, before minInterval expires, with burst.
- timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
- runner.Run()
- waitForRun("second run", t, timer, obj)
- // Run again, before minInterval expires.
- timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
- runner.Run()
- waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
- // Run again, before minInterval expires.
- timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
- runner.Run()
- waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
- // Run again, before minInterval expires.
- timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
- runner.Run()
- waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
- // Advance timer enough to replenish bursts, but not enough to be minInterval
- // after the last run
- timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
- waitForNothing("not minInterval", t, timer, obj)
- runner.Run()
- waitForRun("third run", t, timer, obj)
- // Run again, before minInterval expires.
- timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
- runner.Run()
- waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
- // Run again, before minInterval expires.
- timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
- runner.Run()
- waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
- // Advance and do the deferred run
- timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
- waitForRun("fourth run", t, timer, obj)
- // Run again, once burst has fully replenished.
- timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
- runner.Run()
- waitForRun("fifth run", t, timer, obj)
- runner.Run()
- waitForRun("sixth run", t, timer, obj)
- runner.Run()
- waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
- // Wait until minInterval after the last run
- timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
- waitForRun("seventh run", t, timer, obj)
- // Wait for maxInterval
- timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
- waitForRun("maxInterval", t, timer, obj)
- // Clean up.
- stop <- struct{}{}
- }
- func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
- obj := &receiver{}
- timer := newFakeTimer()
- runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
- stop := make(chan struct{})
- var upd timerUpdate
- // Start.
- go runner.Loop(stop)
- upd = <-timer.updated // wait for initial time to be set to max
- checkTimer("init", t, upd, true, maxInterval)
- checkReceiver("init", t, obj, false)
- // Run once, immediately, and queue a retry
- // rel=0ms
- obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
- runner.Run()
- waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
- // Nothing happens...
- timer.advance(time.Second) // rel=1000ms
- waitForNothing("minInterval, nothing queued", t, timer, obj)
- // After retryInterval, function is called
- timer.advance(4 * time.Second) // rel=5000ms
- waitForRun("retry", t, timer, obj)
- // Run again, before minInterval expires.
- timer.advance(499 * time.Millisecond) // rel=499ms
- runner.Run()
- waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
- // Do the deferred run, queue another retry after it returns
- timer.advance(501 * time.Millisecond) // rel=1000ms
- runner.RetryAfter(5 * time.Second)
- waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
- // Wait for minInterval to pass
- timer.advance(time.Second) // rel=1000ms
- waitForNothing("minInterval, nothing queued", t, timer, obj)
- // Now do another run
- runner.Run()
- waitForRun("third run", t, timer, obj)
- // Retry was cancelled because we already ran
- timer.advance(4 * time.Second)
- waitForNothing("retry cancelled", t, timer, obj)
- // Run, queue a retry from a goroutine
- obj.setRetryFn(func() {
- go func() {
- time.Sleep(100 * time.Millisecond)
- runner.RetryAfter(5 * time.Second)
- }()
- })
- runner.Run()
- waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
- // Call Run again before minInterval passes
- timer.advance(100 * time.Millisecond) // rel=100ms
- runner.Run()
- waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
- // Deferred run will run after minInterval passes
- timer.advance(900 * time.Millisecond) // rel=1000ms
- waitForRun("fifth run", t, timer, obj)
- // Retry was cancelled because we already ran
- timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
- waitForNothing("retry cancelled", t, timer, obj)
- // Rerun happens after maxInterval
- timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
- waitForNothing("premature", t, timer, obj)
- timer.advance(time.Second) // rel=10s since run
- waitForRun("maxInterval", t, timer, obj)
- // Clean up.
- stop <- struct{}{}
- }
|