sliding window thread safe
This commit is contained in:
parent
26f7d10e16
commit
f447404608
@ -1,6 +1,7 @@
|
||||
package avg_sliding_window
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
lm "github.com/emirpasic/gods/maps/linkedhashmap"
|
||||
@ -44,6 +45,7 @@ type bucket struct {
|
||||
// Data points are rounded to nearest bucket of size `bucketSize`,
|
||||
// and evicted when they are too old based on `windowLength`
|
||||
type AvgSlidingWindow struct {
|
||||
mux sync.Mutex
|
||||
bucketSize time.Duration
|
||||
windowLength time.Duration
|
||||
clock Clock
|
||||
@ -112,6 +114,9 @@ func (sw *AvgSlidingWindow) Incr() {
|
||||
func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
|
||||
sw.advance()
|
||||
|
||||
defer sw.mux.Unlock()
|
||||
sw.mux.Lock()
|
||||
|
||||
key := t.Round(sw.bucketSize)
|
||||
if !sw.inWindow(key) {
|
||||
return
|
||||
@ -139,6 +144,8 @@ func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
|
||||
|
||||
// advance evicts old data points
|
||||
func (sw *AvgSlidingWindow) advance() {
|
||||
defer sw.mux.Unlock()
|
||||
sw.mux.Lock()
|
||||
now := sw.clock.Now().Round(sw.bucketSize)
|
||||
windowStart := now.Add(-sw.windowLength)
|
||||
keys := sw.buckets.Keys()
|
||||
|
Loading…
Reference in New Issue
Block a user