sliding window pkg
This commit is contained in:
parent
f332cfbcd0
commit
a61c8f3f2a
@ -31,6 +31,7 @@ require (
|
|||||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
|
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
|
||||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||||
github.com/edsrzf/mmap-go v1.1.0 // indirect
|
github.com/edsrzf/mmap-go v1.1.0 // indirect
|
||||||
|
github.com/emirpasic/gods v1.18.1 // indirect
|
||||||
github.com/fjl/memsize v0.0.1 // indirect
|
github.com/fjl/memsize v0.0.1 // indirect
|
||||||
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
|
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
|
||||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||||
|
@ -140,6 +140,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
|
|||||||
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
|
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
|
||||||
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
|
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
|
||||||
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
|
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
|
||||||
|
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
|
||||||
|
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||||
|
176
proxyd/proxyd/pkg/avg-sliding-window/sliding.go
Normal file
176
proxyd/proxyd/pkg/avg-sliding-window/sliding.go
Normal file
@ -0,0 +1,176 @@
|
|||||||
|
package avg_sliding_window
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
lm "github.com/emirpasic/gods/maps/linkedhashmap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Clock interface {
|
||||||
|
Now() time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultClock provides a clock that gets current time from the system time
|
||||||
|
type DefaultClock struct{}
|
||||||
|
|
||||||
|
func NewDefaultClock() *DefaultClock {
|
||||||
|
return &DefaultClock{}
|
||||||
|
}
|
||||||
|
func (c DefaultClock) Now() time.Time {
|
||||||
|
return time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// AdjustableClock provides a static clock to easily override the system time
|
||||||
|
type AdjustableClock struct {
|
||||||
|
now time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAdjustableClock(now time.Time) *AdjustableClock {
|
||||||
|
return &AdjustableClock{now: now}
|
||||||
|
}
|
||||||
|
func (c *AdjustableClock) Now() time.Time {
|
||||||
|
return c.now
|
||||||
|
}
|
||||||
|
func (c *AdjustableClock) Set(now time.Time) {
|
||||||
|
c.now = now
|
||||||
|
}
|
||||||
|
|
||||||
|
type bucket struct {
|
||||||
|
sum float64
|
||||||
|
qty uint
|
||||||
|
}
|
||||||
|
|
||||||
|
// AvgSlidingWindow calculates moving averages efficiently.
|
||||||
|
// Data points are rounded to nearest bucket of size `bucketSize`,
|
||||||
|
// and evicted when they are too old based on `windowLength`
|
||||||
|
type AvgSlidingWindow struct {
|
||||||
|
bucketSize time.Duration
|
||||||
|
windowLength time.Duration
|
||||||
|
clock Clock
|
||||||
|
buckets *lm.Map
|
||||||
|
qty uint
|
||||||
|
sum float64
|
||||||
|
}
|
||||||
|
|
||||||
|
type SlidingWindowOpts func(sw *AvgSlidingWindow)
|
||||||
|
|
||||||
|
func NewSlidingWindow(opts ...SlidingWindowOpts) *AvgSlidingWindow {
|
||||||
|
sw := &AvgSlidingWindow{
|
||||||
|
buckets: lm.New(),
|
||||||
|
}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(sw)
|
||||||
|
}
|
||||||
|
if sw.bucketSize == 0 {
|
||||||
|
sw.bucketSize = time.Second
|
||||||
|
}
|
||||||
|
if sw.windowLength == 0 {
|
||||||
|
sw.windowLength = 5 * time.Minute
|
||||||
|
}
|
||||||
|
if sw.clock == nil {
|
||||||
|
sw.clock = NewDefaultClock()
|
||||||
|
}
|
||||||
|
return sw
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithWindowLength(windowLength time.Duration) SlidingWindowOpts {
|
||||||
|
return func(sw *AvgSlidingWindow) {
|
||||||
|
sw.windowLength = windowLength
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithBucketSize(bucketSize time.Duration) SlidingWindowOpts {
|
||||||
|
return func(sw *AvgSlidingWindow) {
|
||||||
|
sw.bucketSize = bucketSize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithClock(clock Clock) SlidingWindowOpts {
|
||||||
|
return func(sw *AvgSlidingWindow) {
|
||||||
|
sw.clock = clock
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *AvgSlidingWindow) inWindow(t time.Time) bool {
|
||||||
|
now := sw.clock.Now().Round(sw.bucketSize)
|
||||||
|
windowStart := now.Add(-sw.windowLength)
|
||||||
|
return windowStart.Before(t) && !t.After(now)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add inserts a new data point into the window, with value `val` with the current time
|
||||||
|
func (sw *AvgSlidingWindow) Add(val float64) {
|
||||||
|
t := sw.clock.Now()
|
||||||
|
sw.AddWithTime(t, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddWithTime inserts a new data point into the window, with value `val` and time `t`
|
||||||
|
func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
|
||||||
|
sw.advance()
|
||||||
|
|
||||||
|
key := t.Round(sw.bucketSize)
|
||||||
|
if !sw.inWindow(key) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var b *bucket
|
||||||
|
current, found := sw.buckets.Get(key)
|
||||||
|
if !found {
|
||||||
|
b = &bucket{}
|
||||||
|
} else {
|
||||||
|
b = current.(*bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
// update bucket
|
||||||
|
bsum := b.sum
|
||||||
|
b.qty += 1
|
||||||
|
b.sum = bsum + val
|
||||||
|
|
||||||
|
// update window
|
||||||
|
wsum := sw.sum
|
||||||
|
sw.qty += 1
|
||||||
|
sw.sum = wsum - bsum + b.sum
|
||||||
|
sw.buckets.Put(key, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// advance evicts old data points
|
||||||
|
func (sw *AvgSlidingWindow) advance() {
|
||||||
|
now := sw.clock.Now().Round(sw.bucketSize)
|
||||||
|
windowStart := now.Add(-sw.windowLength)
|
||||||
|
keys := sw.buckets.Keys()
|
||||||
|
for _, k := range keys {
|
||||||
|
if k.(time.Time).After(windowStart) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
val, _ := sw.buckets.Get(k)
|
||||||
|
b := val.(*bucket)
|
||||||
|
sw.buckets.Remove(k)
|
||||||
|
if sw.buckets.Size() > 0 {
|
||||||
|
sw.qty -= b.qty
|
||||||
|
sw.sum = sw.sum - b.sum
|
||||||
|
} else {
|
||||||
|
sw.qty = 0
|
||||||
|
sw.sum = 0.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Avg retrieves the current average for the sliding window
|
||||||
|
func (sw *AvgSlidingWindow) Avg() float64 {
|
||||||
|
sw.advance()
|
||||||
|
if sw.qty == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return sw.sum / float64(sw.qty)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sum retrieves the current sum for the sliding window
|
||||||
|
func (sw *AvgSlidingWindow) Sum() float64 {
|
||||||
|
sw.advance()
|
||||||
|
return sw.sum
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count retrieves the data point count for the sliding window
|
||||||
|
func (sw *AvgSlidingWindow) Count() uint {
|
||||||
|
sw.advance()
|
||||||
|
return sw.qty
|
||||||
|
}
|
278
proxyd/proxyd/pkg/avg-sliding-window/sliding_test.go
Normal file
278
proxyd/proxyd/pkg/avg-sliding-window/sliding_test.go
Normal file
@ -0,0 +1,278 @@
|
|||||||
|
package avg_sliding_window
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSlidingWindow_AddWithTime_Single(t *testing.T) {
|
||||||
|
now := ts("2023-04-21 15:04:05")
|
||||||
|
clock := NewAdjustableClock(now)
|
||||||
|
|
||||||
|
sw := NewSlidingWindow(
|
||||||
|
WithWindowLength(10*time.Second),
|
||||||
|
WithBucketSize(time.Second),
|
||||||
|
WithClock(clock))
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 5)
|
||||||
|
require.Equal(t, 5.0, sw.Avg())
|
||||||
|
require.Equal(t, 5.0, sw.Sum())
|
||||||
|
require.Equal(t, 1, int(sw.Count()))
|
||||||
|
require.Equal(t, 1, sw.buckets.Size())
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 5.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlidingWindow_AddWithTime_TwoValues_SameBucket(t *testing.T) {
|
||||||
|
now := ts("2023-04-21 15:04:05")
|
||||||
|
clock := NewAdjustableClock(now)
|
||||||
|
|
||||||
|
sw := NewSlidingWindow(
|
||||||
|
WithWindowLength(10*time.Second),
|
||||||
|
WithBucketSize(time.Second),
|
||||||
|
WithClock(clock))
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 5)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 5)
|
||||||
|
require.Equal(t, 5.0, sw.Avg())
|
||||||
|
require.Equal(t, 10.0, sw.Sum())
|
||||||
|
require.Equal(t, 2, int(sw.Count()))
|
||||||
|
require.Equal(t, 1, sw.buckets.Size())
|
||||||
|
require.Equal(t, 2, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 10.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlidingWindow_AddWithTime_ThreeValues_SameBucket(t *testing.T) {
|
||||||
|
now := ts("2023-04-21 15:04:05")
|
||||||
|
clock := NewAdjustableClock(now)
|
||||||
|
|
||||||
|
sw := NewSlidingWindow(
|
||||||
|
WithWindowLength(10*time.Second),
|
||||||
|
WithBucketSize(time.Second),
|
||||||
|
WithClock(clock))
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 4)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 5)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 6)
|
||||||
|
require.Equal(t, 5.0, sw.Avg())
|
||||||
|
require.Equal(t, 15.0, sw.Sum())
|
||||||
|
require.Equal(t, 3, int(sw.Count()))
|
||||||
|
require.Equal(t, 1, sw.buckets.Size())
|
||||||
|
require.Equal(t, 15.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
require.Equal(t, 3, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlidingWindow_AddWithTime_ThreeValues_ThreeBuckets(t *testing.T) {
|
||||||
|
now := ts("2023-04-21 15:04:05")
|
||||||
|
clock := NewAdjustableClock(now)
|
||||||
|
|
||||||
|
sw := NewSlidingWindow(
|
||||||
|
WithWindowLength(10*time.Second),
|
||||||
|
WithBucketSize(time.Second),
|
||||||
|
WithClock(clock))
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:01"), 4)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:02"), 5)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 6)
|
||||||
|
require.Equal(t, 5.0, sw.Avg())
|
||||||
|
require.Equal(t, 15.0, sw.Sum())
|
||||||
|
require.Equal(t, 3, int(sw.Count()))
|
||||||
|
require.Equal(t, 3, sw.buckets.Size())
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 4.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty))
|
||||||
|
require.Equal(t, 5.0, sw.buckets.Values()[1].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty))
|
||||||
|
require.Equal(t, 6.0, sw.buckets.Values()[2].(*bucket).sum)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlidingWindow_AddWithTime_OutWindow(t *testing.T) {
|
||||||
|
now := ts("2023-04-21 15:04:05")
|
||||||
|
clock := NewAdjustableClock(now)
|
||||||
|
|
||||||
|
sw := NewSlidingWindow(
|
||||||
|
WithWindowLength(10*time.Second),
|
||||||
|
WithBucketSize(time.Second),
|
||||||
|
WithClock(clock))
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:03:55"), 1000)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:01"), 4)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:02"), 5)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 6)
|
||||||
|
require.Equal(t, 5.0, sw.Avg())
|
||||||
|
require.Equal(t, 15.0, sw.Sum())
|
||||||
|
require.Equal(t, 3, int(sw.Count()))
|
||||||
|
require.Equal(t, 3, sw.buckets.Size())
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 4.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty))
|
||||||
|
require.Equal(t, 5.0, sw.buckets.Values()[1].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty))
|
||||||
|
require.Equal(t, 6.0, sw.buckets.Values()[2].(*bucket).sum)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlidingWindow_AdvanceClock(t *testing.T) {
|
||||||
|
now := ts("2023-04-21 15:04:05")
|
||||||
|
clock := NewAdjustableClock(now)
|
||||||
|
|
||||||
|
sw := NewSlidingWindow(
|
||||||
|
WithWindowLength(10*time.Second),
|
||||||
|
WithBucketSize(time.Second),
|
||||||
|
WithClock(clock))
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:01"), 4)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:02"), 5)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 6)
|
||||||
|
require.Equal(t, 5.0, sw.Avg())
|
||||||
|
require.Equal(t, 15.0, sw.Sum())
|
||||||
|
require.Equal(t, 3, int(sw.Count()))
|
||||||
|
require.Equal(t, 3, sw.buckets.Size())
|
||||||
|
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 4.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty))
|
||||||
|
require.Equal(t, 5.0, sw.buckets.Values()[1].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty))
|
||||||
|
require.Equal(t, 6.0, sw.buckets.Values()[2].(*bucket).sum)
|
||||||
|
|
||||||
|
// up until 15:04:05 we had 3 buckets
|
||||||
|
// let's advance the clock to 15:04:11 and the first data point should be evicted
|
||||||
|
clock.Set(ts("2023-04-21 15:04:11"))
|
||||||
|
require.Equal(t, 5.5, sw.Avg())
|
||||||
|
require.Equal(t, 11.0, sw.Sum())
|
||||||
|
require.Equal(t, 2, int(sw.Count()))
|
||||||
|
require.Equal(t, 2, sw.buckets.Size())
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 5.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty))
|
||||||
|
require.Equal(t, 6.0, sw.buckets.Values()[1].(*bucket).sum)
|
||||||
|
|
||||||
|
// let's advance the clock to 15:04:12 and another data point should be evicted
|
||||||
|
clock.Set(ts("2023-04-21 15:04:12"))
|
||||||
|
require.Equal(t, 6.0, sw.Avg())
|
||||||
|
require.Equal(t, 6.0, sw.Sum())
|
||||||
|
require.Equal(t, 1, int(sw.Count()))
|
||||||
|
require.Equal(t, 1, sw.buckets.Size())
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 6.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
|
||||||
|
// let's advance the clock to 15:04:25 and all data point should be evicted
|
||||||
|
clock.Set(ts("2023-04-21 15:04:25"))
|
||||||
|
require.Equal(t, 0.0, sw.Avg())
|
||||||
|
require.Equal(t, 0.0, sw.Sum())
|
||||||
|
require.Equal(t, 0, int(sw.Count()))
|
||||||
|
require.Equal(t, 0, sw.buckets.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlidingWindow_MultipleValPerBucket(t *testing.T) {
|
||||||
|
now := ts("2023-04-21 15:04:05")
|
||||||
|
clock := NewAdjustableClock(now)
|
||||||
|
|
||||||
|
sw := NewSlidingWindow(
|
||||||
|
WithWindowLength(10*time.Second),
|
||||||
|
WithBucketSize(time.Second),
|
||||||
|
WithClock(clock))
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:01"), 4)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:01"), 12)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:02"), 5)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:02"), 15)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 6)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 3)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 1)
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 3)
|
||||||
|
require.Equal(t, 6.125, sw.Avg())
|
||||||
|
require.Equal(t, 49.0, sw.Sum())
|
||||||
|
require.Equal(t, 8, int(sw.Count()))
|
||||||
|
require.Equal(t, 3, sw.buckets.Size())
|
||||||
|
require.Equal(t, 2, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 16.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
require.Equal(t, 2, int(sw.buckets.Values()[1].(*bucket).qty))
|
||||||
|
require.Equal(t, 20.0, sw.buckets.Values()[1].(*bucket).sum)
|
||||||
|
require.Equal(t, 4, int(sw.buckets.Values()[2].(*bucket).qty))
|
||||||
|
require.Equal(t, 13.0, sw.buckets.Values()[2].(*bucket).sum)
|
||||||
|
|
||||||
|
// up until 15:04:05 we had 3 buckets
|
||||||
|
// let's advance the clock to 15:04:11 and the first data point should be evicted
|
||||||
|
clock.Set(ts("2023-04-21 15:04:11"))
|
||||||
|
require.Equal(t, 5.5, sw.Avg())
|
||||||
|
require.Equal(t, 33.0, sw.Sum())
|
||||||
|
require.Equal(t, 6, int(sw.Count()))
|
||||||
|
require.Equal(t, 2, sw.buckets.Size())
|
||||||
|
require.Equal(t, 2, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 20.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
require.Equal(t, 4, int(sw.buckets.Values()[1].(*bucket).qty))
|
||||||
|
require.Equal(t, 13.0, sw.buckets.Values()[1].(*bucket).sum)
|
||||||
|
|
||||||
|
// let's advance the clock to 15:04:12 and another data point should be evicted
|
||||||
|
clock.Set(ts("2023-04-21 15:04:12"))
|
||||||
|
require.Equal(t, 3.25, sw.Avg())
|
||||||
|
require.Equal(t, 13.0, sw.Sum())
|
||||||
|
require.Equal(t, 4, int(sw.Count()))
|
||||||
|
require.Equal(t, 1, sw.buckets.Size())
|
||||||
|
require.Equal(t, 4, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 13.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
|
||||||
|
// let's advance the clock to 15:04:25 and all data point should be evicted
|
||||||
|
clock.Set(ts("2023-04-21 15:04:25"))
|
||||||
|
require.Equal(t, 0.0, sw.Avg())
|
||||||
|
require.Equal(t, 0, sw.buckets.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlidingWindow_CustomBucket(t *testing.T) {
|
||||||
|
now := ts("2023-04-21 15:04:05")
|
||||||
|
clock := NewAdjustableClock(now)
|
||||||
|
|
||||||
|
sw := NewSlidingWindow(
|
||||||
|
WithWindowLength(30*time.Second),
|
||||||
|
WithBucketSize(10*time.Second),
|
||||||
|
WithClock(clock))
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:03:49"), 5) // key: 03:50, sum: 5.0
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:02"), 15) // key: 04:00
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:03"), 5) // key: 04:00
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:04"), 1) // key: 04:00, sum: 21.0
|
||||||
|
sw.AddWithTime(ts("2023-04-21 15:04:05"), 3) // key: 04:10, sum: 3.0
|
||||||
|
require.Equal(t, 5.8, sw.Avg())
|
||||||
|
require.Equal(t, 29.0, sw.Sum())
|
||||||
|
require.Equal(t, 5, int(sw.Count()))
|
||||||
|
require.Equal(t, 3, sw.buckets.Size())
|
||||||
|
require.Equal(t, 5.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 21.0, sw.buckets.Values()[1].(*bucket).sum)
|
||||||
|
require.Equal(t, 3, int(sw.buckets.Values()[1].(*bucket).qty))
|
||||||
|
require.Equal(t, 3.0, sw.buckets.Values()[2].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[2].(*bucket).qty))
|
||||||
|
|
||||||
|
// up until 15:04:05 we had 3 buckets
|
||||||
|
// let's advance the clock to 15:04:21 and the first data point should be evicted
|
||||||
|
clock.Set(ts("2023-04-21 15:04:21"))
|
||||||
|
require.Equal(t, 6.0, sw.Avg())
|
||||||
|
require.Equal(t, 24.0, sw.Sum())
|
||||||
|
require.Equal(t, 4, int(sw.Count()))
|
||||||
|
require.Equal(t, 2, sw.buckets.Size())
|
||||||
|
require.Equal(t, 21.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
require.Equal(t, 3, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
require.Equal(t, 3.0, sw.buckets.Values()[1].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[1].(*bucket).qty))
|
||||||
|
|
||||||
|
// let's advance the clock to 15:04:32 and another data point should be evicted
|
||||||
|
clock.Set(ts("2023-04-21 15:04:32"))
|
||||||
|
require.Equal(t, 3.0, sw.Avg())
|
||||||
|
require.Equal(t, 3.0, sw.Sum())
|
||||||
|
require.Equal(t, 1, sw.buckets.Size())
|
||||||
|
require.Equal(t, 1, int(sw.Count()))
|
||||||
|
require.Equal(t, 3.0, sw.buckets.Values()[0].(*bucket).sum)
|
||||||
|
require.Equal(t, 1, int(sw.buckets.Values()[0].(*bucket).qty))
|
||||||
|
|
||||||
|
// let's advance the clock to 15:04:46 and all data point should be evicted
|
||||||
|
clock.Set(ts("2023-04-21 15:04:46"))
|
||||||
|
require.Equal(t, 0.0, sw.Avg())
|
||||||
|
require.Equal(t, 0.0, sw.Sum())
|
||||||
|
require.Equal(t, 0, int(sw.Count()))
|
||||||
|
require.Equal(t, 0, sw.buckets.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ts is a convenient method that must parse a time.Time from a string in format `"2006-01-02 15:04:05"`
|
||||||
|
func ts(s string) time.Time {
|
||||||
|
format := "2006-01-02 15:04:05"
|
||||||
|
t, err := time.Parse(format, s)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return t
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user