common/mclock: clean up AfterFunc support (#20054)

This change adds tests for the virtual clock and aligns the interface
with the time package by renaming Cancel to Stop. It also removes the
binary search from Stop because it complicates the code unnecessarily.
This commit is contained in:
Felix Lange 2019-09-16 11:16:30 +02:00 committed by GitHub
parent aff986958d
commit b1c3010bf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 160 additions and 67 deletions

@ -36,47 +36,39 @@ func (t AbsTime) Add(d time.Duration) AbsTime {
return t + AbsTime(d) return t + AbsTime(d)
} }
// Clock interface makes it possible to replace the monotonic system clock with // The Clock interface makes it possible to replace the monotonic system clock with
// a simulated clock. // a simulated clock.
type Clock interface { type Clock interface {
Now() AbsTime Now() AbsTime
Sleep(time.Duration) Sleep(time.Duration)
After(time.Duration) <-chan time.Time After(time.Duration) <-chan time.Time
AfterFunc(d time.Duration, f func()) Event AfterFunc(d time.Duration, f func()) Timer
} }
// Event represents a cancellable event returned by AfterFunc // Timer represents a cancellable event returned by AfterFunc
type Event interface { type Timer interface {
Cancel() bool Stop() bool
} }
// System implements Clock using the system clock. // System implements Clock using the system clock.
type System struct{} type System struct{}
// Now implements Clock. // Now returns the current monotonic time.
func (System) Now() AbsTime { func (System) Now() AbsTime {
return AbsTime(monotime.Now()) return AbsTime(monotime.Now())
} }
// Sleep implements Clock. // Sleep blocks for the given duration.
func (System) Sleep(d time.Duration) { func (System) Sleep(d time.Duration) {
time.Sleep(d) time.Sleep(d)
} }
// After implements Clock. // After returns a channel which receives the current time after d has elapsed.
func (System) After(d time.Duration) <-chan time.Time { func (System) After(d time.Duration) <-chan time.Time {
return time.After(d) return time.After(d)
} }
// AfterFunc implements Clock. // AfterFunc runs f on a new goroutine after the duration has elapsed.
func (System) AfterFunc(d time.Duration, f func()) Event { func (System) AfterFunc(d time.Duration, f func()) Timer {
return (*SystemEvent)(time.AfterFunc(d, f)) return time.AfterFunc(d, f)
}
// SystemEvent implements Event using time.Timer.
type SystemEvent time.Timer
// Cancel implements Event.
func (e *SystemEvent) Cancel() bool {
return (*time.Timer)(e).Stop()
} }

@ -32,22 +32,17 @@ import (
// the timeout using a channel or semaphore. // the timeout using a channel or semaphore.
type Simulated struct { type Simulated struct {
now AbsTime now AbsTime
scheduled []event scheduled []*simTimer
mu sync.RWMutex mu sync.RWMutex
cond *sync.Cond cond *sync.Cond
lastId uint64 lastId uint64
} }
type event struct { // simTimer implements Timer on the virtual clock.
type simTimer struct {
do func() do func()
at AbsTime at AbsTime
id uint64 id uint64
}
// SimulatedEvent implements Event for a virtual clock.
type SimulatedEvent struct {
at AbsTime
id uint64
s *Simulated s *Simulated
} }
@ -75,6 +70,7 @@ func (s *Simulated) Run(d time.Duration) {
} }
} }
// ActiveTimers returns the number of timers that haven't fired.
func (s *Simulated) ActiveTimers() int { func (s *Simulated) ActiveTimers() int {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
@ -82,6 +78,7 @@ func (s *Simulated) ActiveTimers() int {
return len(s.scheduled) return len(s.scheduled)
} }
// WaitForTimers waits until the clock has at least n scheduled timers.
func (s *Simulated) WaitForTimers(n int) { func (s *Simulated) WaitForTimers(n int) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -92,7 +89,7 @@ func (s *Simulated) WaitForTimers(n int) {
} }
} }
// Now implements Clock. // Now returns the current virtual time.
func (s *Simulated) Now() AbsTime { func (s *Simulated) Now() AbsTime {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
@ -100,12 +97,13 @@ func (s *Simulated) Now() AbsTime {
return s.now return s.now
} }
// Sleep implements Clock. // Sleep blocks until the clock has advanced by d.
func (s *Simulated) Sleep(d time.Duration) { func (s *Simulated) Sleep(d time.Duration) {
<-s.After(d) <-s.After(d)
} }
// After implements Clock. // After returns a channel which receives the current time after the clock
// has advanced by d.
func (s *Simulated) After(d time.Duration) <-chan time.Time { func (s *Simulated) After(d time.Duration) <-chan time.Time {
after := make(chan time.Time, 1) after := make(chan time.Time, 1)
s.AfterFunc(d, func() { s.AfterFunc(d, func() {
@ -114,8 +112,9 @@ func (s *Simulated) After(d time.Duration) <-chan time.Time {
return after return after
} }
// AfterFunc implements Clock. // AfterFunc runs fn after the clock has advanced by d. Unlike with the system
func (s *Simulated) AfterFunc(d time.Duration, do func()) Event { // clock, fn runs on the goroutine that calls Run.
func (s *Simulated) AfterFunc(d time.Duration, fn func()) Timer {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.init() s.init()
@ -133,12 +132,27 @@ func (s *Simulated) AfterFunc(d time.Duration, do func()) Event {
l = m + 1 l = m + 1
} }
} }
s.scheduled = append(s.scheduled, event{}) ev := &simTimer{do: fn, at: at, s: s}
s.scheduled = append(s.scheduled, nil)
copy(s.scheduled[l+1:], s.scheduled[l:ll]) copy(s.scheduled[l+1:], s.scheduled[l:ll])
e := event{do: do, at: at, id: id} s.scheduled[l] = ev
s.scheduled[l] = e
s.cond.Broadcast() s.cond.Broadcast()
return &SimulatedEvent{at: at, id: id, s: s} return ev
}
func (ev *simTimer) Stop() bool {
s := ev.s
s.mu.Lock()
defer s.mu.Unlock()
for i := 0; i < len(s.scheduled); i++ {
if s.scheduled[i] == ev {
s.scheduled = append(s.scheduled[:i], s.scheduled[i+1:]...)
s.cond.Broadcast()
return true
}
}
return false
} }
func (s *Simulated) init() { func (s *Simulated) init() {
@ -146,31 +160,3 @@ func (s *Simulated) init() {
s.cond = sync.NewCond(&s.mu) s.cond = sync.NewCond(&s.mu)
} }
} }
// Cancel implements Event.
func (e *SimulatedEvent) Cancel() bool {
s := e.s
s.mu.Lock()
defer s.mu.Unlock()
l, h := 0, len(s.scheduled)
ll := h
for l != h {
m := (l + h) / 2
if e.id == s.scheduled[m].id {
l = m
break
}
if (e.at < s.scheduled[m].at) || ((e.at == s.scheduled[m].at) && (e.id < s.scheduled[m].id)) {
h = m
} else {
l = m + 1
}
}
if l >= ll || s.scheduled[l].id != e.id {
return false
}
copy(s.scheduled[l:ll-1], s.scheduled[l+1:])
s.scheduled = s.scheduled[:ll-1]
return true
}

@ -0,0 +1,115 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package mclock
import (
"testing"
"time"
)
var _ Clock = System{}
var _ Clock = new(Simulated)
func TestSimulatedAfter(t *testing.T) {
const timeout = 30 * time.Minute
const adv = time.Minute
var (
c Simulated
end = c.Now().Add(timeout)
ch = c.After(timeout)
)
for c.Now() < end.Add(-adv) {
c.Run(adv)
select {
case <-ch:
t.Fatal("Timer fired early")
default:
}
}
c.Run(adv)
select {
case stamp := <-ch:
want := time.Time{}.Add(timeout)
if !stamp.Equal(want) {
t.Errorf("Wrong time sent on timer channel: got %v, want %v", stamp, want)
}
default:
t.Fatal("Timer didn't fire")
}
}
func TestSimulatedAfterFunc(t *testing.T) {
var c Simulated
called1 := false
timer1 := c.AfterFunc(100*time.Millisecond, func() { called1 = true })
if c.ActiveTimers() != 1 {
t.Fatalf("%d active timers, want one", c.ActiveTimers())
}
if fired := timer1.Stop(); !fired {
t.Fatal("Stop returned false even though timer didn't fire")
}
if c.ActiveTimers() != 0 {
t.Fatalf("%d active timers, want zero", c.ActiveTimers())
}
if called1 {
t.Fatal("timer 1 called")
}
if fired := timer1.Stop(); fired {
t.Fatal("Stop returned true after timer was already stopped")
}
called2 := false
timer2 := c.AfterFunc(100*time.Millisecond, func() { called2 = true })
c.Run(50 * time.Millisecond)
if called2 {
t.Fatal("timer 2 called")
}
c.Run(51 * time.Millisecond)
if !called2 {
t.Fatal("timer 2 not called")
}
if fired := timer2.Stop(); fired {
t.Fatal("Stop returned true after timer has fired")
}
}
func TestSimulatedSleep(t *testing.T) {
var (
c Simulated
timeout = 1 * time.Hour
done = make(chan AbsTime)
)
go func() {
c.Sleep(timeout)
done <- c.Now()
}()
c.WaitForTimers(1)
c.Run(2 * timeout)
select {
case stamp := <-done:
want := AbsTime(2 * timeout)
if stamp != want {
t.Errorf("Wrong time after sleep: got %v, want %v", stamp, want)
}
case <-time.After(5 * time.Second):
t.Fatal("Sleep didn't return in time")
}
}

@ -42,7 +42,7 @@ type balanceTracker struct {
negTimeFactor, negRequestFactor float64 negTimeFactor, negRequestFactor float64
sumReqCost uint64 sumReqCost uint64
lastUpdate, nextUpdate, initTime mclock.AbsTime lastUpdate, nextUpdate, initTime mclock.AbsTime
updateEvent mclock.Event updateEvent mclock.Timer
// since only a limited and fixed number of callbacks are needed, they are // since only a limited and fixed number of callbacks are needed, they are
// stored in a fixed size array ordered by priority threshold. // stored in a fixed size array ordered by priority threshold.
callbacks [balanceCallbackCount]balanceCallback callbacks [balanceCallbackCount]balanceCallback
@ -86,7 +86,7 @@ func (bt *balanceTracker) stop(now mclock.AbsTime) {
bt.timeFactor = 0 bt.timeFactor = 0
bt.requestFactor = 0 bt.requestFactor = 0
if bt.updateEvent != nil { if bt.updateEvent != nil {
bt.updateEvent.Cancel() bt.updateEvent.Stop()
bt.updateEvent = nil bt.updateEvent = nil
} }
} }
@ -235,7 +235,7 @@ func (bt *balanceTracker) checkCallbacks(now mclock.AbsTime) {
// updateAfter schedules a balance update and callback check in the future // updateAfter schedules a balance update and callback check in the future
func (bt *balanceTracker) updateAfter(dt time.Duration) { func (bt *balanceTracker) updateAfter(dt time.Duration) {
if bt.updateEvent == nil || bt.updateEvent.Cancel() { if bt.updateEvent == nil || bt.updateEvent.Stop() {
if dt == 0 { if dt == 0 {
bt.updateEvent = nil bt.updateEvent = nil
} else { } else {