diff --git a/proxyd/proxyd/go.mod b/proxyd/proxyd/go.mod index 68bf63c..97c7b02 100644 --- a/proxyd/proxyd/go.mod +++ b/proxyd/proxyd/go.mod @@ -31,6 +31,7 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/fjl/memsize v0.0.1 // indirect github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect github.com/go-ole/go-ole v1.2.6 // indirect diff --git a/proxyd/proxyd/go.sum b/proxyd/proxyd/go.sum index 4db2ccf..4c71d15 100644 --- a/proxyd/proxyd/go.sum +++ b/proxyd/proxyd/go.sum @@ -140,6 +140,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/proxyd/proxyd/pkg/avg-sliding-window/sliding.go b/proxyd/proxyd/pkg/avg-sliding-window/sliding.go new file mode 100644 index 0000000..721daba --- /dev/null +++ b/proxyd/proxyd/pkg/avg-sliding-window/sliding.go @@ -0,0 +1,176 @@ +package avg_sliding_window + +import ( + "time" + + lm "github.com/emirpasic/gods/maps/linkedhashmap" +) + +type Clock interface { + Now() time.Time +} + +// DefaultClock provides a clock that gets current time from the system time +type DefaultClock struct{} + +func NewDefaultClock() *DefaultClock { + return &DefaultClock{} +} +func (c DefaultClock) Now() time.Time { + return time.Now() +} + +// AdjustableClock provides a static clock to easily override the system time +type AdjustableClock struct { + now time.Time +} + +func NewAdjustableClock(now time.Time) *AdjustableClock { + return &AdjustableClock{now: now} +} +func (c *AdjustableClock) Now() time.Time { + return c.now +} +func (c *AdjustableClock) Set(now time.Time) { + c.now = now +} + +type bucket struct { + sum float64 + qty uint +} + +// AvgSlidingWindow calculates moving averages efficiently. +// Data points are rounded to nearest bucket of size `bucketSize`, +// and evicted when they are too old based on `windowLength` +type AvgSlidingWindow struct { + bucketSize time.Duration + windowLength time.Duration + clock Clock + buckets *lm.Map + qty uint + sum float64 +} + +type SlidingWindowOpts func(sw *AvgSlidingWindow) + +func NewSlidingWindow(opts ...SlidingWindowOpts) *AvgSlidingWindow { + sw := &AvgSlidingWindow{ + buckets: lm.New(), + } + for _, opt := range opts { + opt(sw) + } + if sw.bucketSize == 0 { + sw.bucketSize = time.Second + } + if sw.windowLength == 0 { + sw.windowLength = 5 * time.Minute + } + if sw.clock == nil { + sw.clock = NewDefaultClock() + } + return sw +} + +func WithWindowLength(windowLength time.Duration) SlidingWindowOpts { + return func(sw *AvgSlidingWindow) { + sw.windowLength = windowLength + } +} + +func WithBucketSize(bucketSize time.Duration) SlidingWindowOpts { + return func(sw *AvgSlidingWindow) { + sw.bucketSize = bucketSize + } +} + +func WithClock(clock Clock) SlidingWindowOpts { + return func(sw *AvgSlidingWindow) { + sw.clock = clock + } +} + +func (sw *AvgSlidingWindow) inWindow(t time.Time) bool { + now := sw.clock.Now().Round(sw.bucketSize) + windowStart := now.Add(-sw.windowLength) + return windowStart.Before(t) && !t.After(now) +} + +// Add inserts a new data point into the window, with value `val` with the current time +func (sw *AvgSlidingWindow) Add(val float64) { + t := sw.clock.Now() + sw.AddWithTime(t, val) +} + +// AddWithTime inserts a new data point into the window, with value `val` and time `t` +func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) { + sw.advance() + + key := t.Round(sw.bucketSize) + if !sw.inWindow(key) { + return + } + + var b *bucket + current, found := sw.buckets.Get(key) + if !found { + b = &bucket{} + } else { + b = current.(*bucket) + } + + // update bucket + bsum := b.sum + b.qty += 1 + b.sum = bsum + val + + // update window + wsum := sw.sum + sw.qty += 1 + sw.sum = wsum - bsum + b.sum + sw.buckets.Put(key, b) +} + +// advance evicts old data points +func (sw *AvgSlidingWindow) advance() { + now := sw.clock.Now().Round(sw.bucketSize) + windowStart := now.Add(-sw.windowLength) + keys := sw.buckets.Keys() + for _, k := range keys { + if k.(time.Time).After(windowStart) { + break + } + val, _ := sw.buckets.Get(k) + b := val.(*bucket) + sw.buckets.Remove(k) + if sw.buckets.Size() > 0 { + sw.qty -= b.qty + sw.sum = sw.sum - b.sum + } else { + sw.qty = 0 + sw.sum = 0.0 + } + } +} + +// Avg retrieves the current average for the sliding window +func (sw *AvgSlidingWindow) Avg() float64 { + sw.advance() + if sw.qty == 0 { + return 0 + } + return sw.sum / float64(sw.qty) +} + +// Sum retrieves the current sum for the sliding window +func (sw *AvgSlidingWindow) Sum() float64 { + sw.advance() + return sw.sum +} + +// Count retrieves the data point count for the sliding window +func (sw *AvgSlidingWindow) Count() uint { + sw.advance() + return sw.qty +} diff --git a/proxyd/proxyd/pkg/avg-sliding-window/sliding_test.go b/proxyd/proxyd/pkg/avg-sliding-window/sliding_test.go new file mode 100644 index 0000000..7f5e9b7 --- /dev/null +++ b/proxyd/proxyd/pkg/avg-sliding-window/sliding_test.go @@ -0,0 +1,278 @@ +package avg_sliding_window + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSlidingWindow_AddWithTime_Single(t *testing.T) { + now := ts("2023-04-21 15:04:05") + clock := NewAdjustableClock(now) + + sw := NewSlidingWindow( + WithWindowLength(10*time.Second), + WithBucketSize(time.Second), + WithClock(clock)) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 5) + require.Equal(t, 5.0, sw.Avg()) + require.Equal(t, 5.0, sw.Sum()) + require.Equal(t, 1, int(sw.Count())) + require.Equal(t, 1, sw.buckets.Size()) + require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 5.0, sw.buckets.Values()[0].(*bucket).sum) +} + +func TestSlidingWindow_AddWithTime_TwoValues_SameBucket(t *testing.T) { + now := ts("2023-04-21 15:04:05") + clock := NewAdjustableClock(now) + + sw := NewSlidingWindow( + WithWindowLength(10*time.Second), + WithBucketSize(time.Second), + WithClock(clock)) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 5) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 5) + require.Equal(t, 5.0, sw.Avg()) + require.Equal(t, 10.0, sw.Sum()) + require.Equal(t, 2, int(sw.Count())) + require.Equal(t, 1, sw.buckets.Size()) + require.Equal(t, 2, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 10.0, sw.buckets.Values()[0].(*bucket).sum) +} + +func TestSlidingWindow_AddWithTime_ThreeValues_SameBucket(t *testing.T) { + now := ts("2023-04-21 15:04:05") + clock := NewAdjustableClock(now) + + sw := NewSlidingWindow( + WithWindowLength(10*time.Second), + WithBucketSize(time.Second), + WithClock(clock)) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 4) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 5) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 6) + require.Equal(t, 5.0, sw.Avg()) + require.Equal(t, 15.0, sw.Sum()) + require.Equal(t, 3, int(sw.Count())) + require.Equal(t, 1, sw.buckets.Size()) + require.Equal(t, 15.0, sw.buckets.Values()[0].(*bucket).sum) + require.Equal(t, 3, int(sw.buckets.Values()[0].(*bucket).qty)) +} + +func TestSlidingWindow_AddWithTime_ThreeValues_ThreeBuckets(t *testing.T) { + now := ts("2023-04-21 15:04:05") + clock := NewAdjustableClock(now) + + sw := NewSlidingWindow( + WithWindowLength(10*time.Second), + WithBucketSize(time.Second), + WithClock(clock)) + sw.AddWithTime(ts("2023-04-21 15:04:01"), 4) + sw.AddWithTime(ts("2023-04-21 15:04:02"), 5) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 6) + require.Equal(t, 5.0, sw.Avg()) + require.Equal(t, 15.0, sw.Sum()) + require.Equal(t, 3, int(sw.Count())) + require.Equal(t, 3, sw.buckets.Size()) + require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 4.0, sw.buckets.Values()[0].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty)) + require.Equal(t, 5.0, sw.buckets.Values()[1].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty)) + require.Equal(t, 6.0, sw.buckets.Values()[2].(*bucket).sum) +} + +func TestSlidingWindow_AddWithTime_OutWindow(t *testing.T) { + now := ts("2023-04-21 15:04:05") + clock := NewAdjustableClock(now) + + sw := NewSlidingWindow( + WithWindowLength(10*time.Second), + WithBucketSize(time.Second), + WithClock(clock)) + sw.AddWithTime(ts("2023-04-21 15:03:55"), 1000) + sw.AddWithTime(ts("2023-04-21 15:04:01"), 4) + sw.AddWithTime(ts("2023-04-21 15:04:02"), 5) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 6) + require.Equal(t, 5.0, sw.Avg()) + require.Equal(t, 15.0, sw.Sum()) + require.Equal(t, 3, int(sw.Count())) + require.Equal(t, 3, sw.buckets.Size()) + require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 4.0, sw.buckets.Values()[0].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty)) + require.Equal(t, 5.0, sw.buckets.Values()[1].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty)) + require.Equal(t, 6.0, sw.buckets.Values()[2].(*bucket).sum) +} + +func TestSlidingWindow_AdvanceClock(t *testing.T) { + now := ts("2023-04-21 15:04:05") + clock := NewAdjustableClock(now) + + sw := NewSlidingWindow( + WithWindowLength(10*time.Second), + WithBucketSize(time.Second), + WithClock(clock)) + sw.AddWithTime(ts("2023-04-21 15:04:01"), 4) + sw.AddWithTime(ts("2023-04-21 15:04:02"), 5) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 6) + require.Equal(t, 5.0, sw.Avg()) + require.Equal(t, 15.0, sw.Sum()) + require.Equal(t, 3, int(sw.Count())) + require.Equal(t, 3, sw.buckets.Size()) + + require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 4.0, sw.buckets.Values()[0].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty)) + require.Equal(t, 5.0, sw.buckets.Values()[1].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty)) + require.Equal(t, 6.0, sw.buckets.Values()[2].(*bucket).sum) + + // up until 15:04:05 we had 3 buckets + // let's advance the clock to 15:04:11 and the first data point should be evicted + clock.Set(ts("2023-04-21 15:04:11")) + require.Equal(t, 5.5, sw.Avg()) + require.Equal(t, 11.0, sw.Sum()) + require.Equal(t, 2, int(sw.Count())) + require.Equal(t, 2, sw.buckets.Size()) + require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 5.0, sw.buckets.Values()[0].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty)) + require.Equal(t, 6.0, sw.buckets.Values()[1].(*bucket).sum) + + // let's advance the clock to 15:04:12 and another data point should be evicted + clock.Set(ts("2023-04-21 15:04:12")) + require.Equal(t, 6.0, sw.Avg()) + require.Equal(t, 6.0, sw.Sum()) + require.Equal(t, 1, int(sw.Count())) + require.Equal(t, 1, sw.buckets.Size()) + require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 6.0, sw.buckets.Values()[0].(*bucket).sum) + + // let's advance the clock to 15:04:25 and all data point should be evicted + clock.Set(ts("2023-04-21 15:04:25")) + require.Equal(t, 0.0, sw.Avg()) + require.Equal(t, 0.0, sw.Sum()) + require.Equal(t, 0, int(sw.Count())) + require.Equal(t, 0, sw.buckets.Size()) +} + +func TestSlidingWindow_MultipleValPerBucket(t *testing.T) { + now := ts("2023-04-21 15:04:05") + clock := NewAdjustableClock(now) + + sw := NewSlidingWindow( + WithWindowLength(10*time.Second), + WithBucketSize(time.Second), + WithClock(clock)) + sw.AddWithTime(ts("2023-04-21 15:04:01"), 4) + sw.AddWithTime(ts("2023-04-21 15:04:01"), 12) + sw.AddWithTime(ts("2023-04-21 15:04:02"), 5) + sw.AddWithTime(ts("2023-04-21 15:04:02"), 15) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 6) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 3) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 1) + sw.AddWithTime(ts("2023-04-21 15:04:05"), 3) + require.Equal(t, 6.125, sw.Avg()) + require.Equal(t, 49.0, sw.Sum()) + require.Equal(t, 8, int(sw.Count())) + require.Equal(t, 3, sw.buckets.Size()) + require.Equal(t, 2, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 16.0, sw.buckets.Values()[0].(*bucket).sum) + require.Equal(t, 2, int(sw.buckets.Values()[1].(*bucket).qty)) + require.Equal(t, 20.0, sw.buckets.Values()[1].(*bucket).sum) + require.Equal(t, 4, int(sw.buckets.Values()[2].(*bucket).qty)) + require.Equal(t, 13.0, sw.buckets.Values()[2].(*bucket).sum) + + // up until 15:04:05 we had 3 buckets + // let's advance the clock to 15:04:11 and the first data point should be evicted + clock.Set(ts("2023-04-21 15:04:11")) + require.Equal(t, 5.5, sw.Avg()) + require.Equal(t, 33.0, sw.Sum()) + require.Equal(t, 6, int(sw.Count())) + require.Equal(t, 2, sw.buckets.Size()) + require.Equal(t, 2, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 20.0, sw.buckets.Values()[0].(*bucket).sum) + require.Equal(t, 4, int(sw.buckets.Values()[1].(*bucket).qty)) + require.Equal(t, 13.0, sw.buckets.Values()[1].(*bucket).sum) + + // let's advance the clock to 15:04:12 and another data point should be evicted + clock.Set(ts("2023-04-21 15:04:12")) + require.Equal(t, 3.25, sw.Avg()) + require.Equal(t, 13.0, sw.Sum()) + require.Equal(t, 4, int(sw.Count())) + require.Equal(t, 1, sw.buckets.Size()) + require.Equal(t, 4, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 13.0, sw.buckets.Values()[0].(*bucket).sum) + + // let's advance the clock to 15:04:25 and all data point should be evicted + clock.Set(ts("2023-04-21 15:04:25")) + require.Equal(t, 0.0, sw.Avg()) + require.Equal(t, 0, sw.buckets.Size()) +} + +func TestSlidingWindow_CustomBucket(t *testing.T) { + now := ts("2023-04-21 15:04:05") + clock := NewAdjustableClock(now) + + sw := NewSlidingWindow( + WithWindowLength(30*time.Second), + WithBucketSize(10*time.Second), + WithClock(clock)) + sw.AddWithTime(ts("2023-04-21 15:03:49"), 5) // key: 03:50, sum: 5.0 + sw.AddWithTime(ts("2023-04-21 15:04:02"), 15) // key: 04:00 + sw.AddWithTime(ts("2023-04-21 15:04:03"), 5) // key: 04:00 + sw.AddWithTime(ts("2023-04-21 15:04:04"), 1) // key: 04:00, sum: 21.0 + sw.AddWithTime(ts("2023-04-21 15:04:05"), 3) // key: 04:10, sum: 3.0 + require.Equal(t, 5.8, sw.Avg()) + require.Equal(t, 29.0, sw.Sum()) + require.Equal(t, 5, int(sw.Count())) + require.Equal(t, 3, sw.buckets.Size()) + require.Equal(t, 5.0, sw.buckets.Values()[0].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 21.0, sw.buckets.Values()[1].(*bucket).sum) + require.Equal(t, 3, int(sw.buckets.Values()[1].(*bucket).qty)) + require.Equal(t, 3.0, sw.buckets.Values()[2].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty)) + + // up until 15:04:05 we had 3 buckets + // let's advance the clock to 15:04:21 and the first data point should be evicted + clock.Set(ts("2023-04-21 15:04:21")) + require.Equal(t, 6.0, sw.Avg()) + require.Equal(t, 24.0, sw.Sum()) + require.Equal(t, 4, int(sw.Count())) + require.Equal(t, 2, sw.buckets.Size()) + require.Equal(t, 21.0, sw.buckets.Values()[0].(*bucket).sum) + require.Equal(t, 3, int(sw.buckets.Values()[0].(*bucket).qty)) + require.Equal(t, 3.0, sw.buckets.Values()[1].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty)) + + // let's advance the clock to 15:04:32 and another data point should be evicted + clock.Set(ts("2023-04-21 15:04:32")) + require.Equal(t, 3.0, sw.Avg()) + require.Equal(t, 3.0, sw.Sum()) + require.Equal(t, 1, sw.buckets.Size()) + require.Equal(t, 1, int(sw.Count())) + require.Equal(t, 3.0, sw.buckets.Values()[0].(*bucket).sum) + require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty)) + + // let's advance the clock to 15:04:46 and all data point should be evicted + clock.Set(ts("2023-04-21 15:04:46")) + require.Equal(t, 0.0, sw.Avg()) + require.Equal(t, 0.0, sw.Sum()) + require.Equal(t, 0, int(sw.Count())) + require.Equal(t, 0, sw.buckets.Size()) +} + +// ts is a convenient method that must parse a time.Time from a string in format `"2006-01-02 15:04:05"` +func ts(s string) time.Time { + format := "2006-01-02 15:04:05" + t, err := time.Parse(format, s) + if err != nil { + panic(err) + } + return t +}