diff --git a/proxyd/proxyd/pkg/avg-sliding-window/sliding.go b/proxyd/proxyd/pkg/avg-sliding-window/sliding.go index b0f2873..70c40be 100644 --- a/proxyd/proxyd/pkg/avg-sliding-window/sliding.go +++ b/proxyd/proxyd/pkg/avg-sliding-window/sliding.go @@ -1,6 +1,7 @@ package avg_sliding_window import ( + "sync" "time" lm "github.com/emirpasic/gods/maps/linkedhashmap" @@ -44,6 +45,7 @@ type bucket struct { // Data points are rounded to nearest bucket of size `bucketSize`, // and evicted when they are too old based on `windowLength` type AvgSlidingWindow struct { + mux sync.Mutex bucketSize time.Duration windowLength time.Duration clock Clock @@ -112,6 +114,9 @@ func (sw *AvgSlidingWindow) Incr() { func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) { sw.advance() + defer sw.mux.Unlock() + sw.mux.Lock() + key := t.Round(sw.bucketSize) if !sw.inWindow(key) { return @@ -139,6 +144,8 @@ func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) { // advance evicts old data points func (sw *AvgSlidingWindow) advance() { + defer sw.mux.Unlock() + sw.mux.Lock() now := sw.clock.Now().Round(sw.bucketSize) windowStart := now.Add(-sw.windowLength) keys := sw.buckets.Keys()