metrics: use sync.map in registry (#27159)

This commit is contained in:
Exca-DK 2023-05-11 11:39:13 +02:00 committed by GitHub
parent a14301823e
commit a340721aa9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 55 deletions

@ -45,21 +45,17 @@ type Registry interface {
// Unregister the metric with the given name. // Unregister the metric with the given name.
Unregister(string) Unregister(string)
// Unregister all metrics. (Mostly for testing.)
UnregisterAll()
} }
// The standard implementation of a Registry is a mutex-protected map // The standard implementation of a Registry uses sync.map
// of names to metrics. // of names to metrics.
type StandardRegistry struct { type StandardRegistry struct {
metrics map[string]interface{} metrics sync.Map
mutex sync.Mutex
} }
// Create a new registry. // Create a new registry.
func NewRegistry() Registry { func NewRegistry() Registry {
return &StandardRegistry{metrics: make(map[string]interface{})} return &StandardRegistry{}
} }
// Call the given function for each registered metric. // Call the given function for each registered metric.
@ -71,9 +67,8 @@ func (r *StandardRegistry) Each(f func(string, interface{})) {
// Get the metric by the given name or nil if none is registered. // Get the metric by the given name or nil if none is registered.
func (r *StandardRegistry) Get(name string) interface{} { func (r *StandardRegistry) Get(name string) interface{} {
r.mutex.Lock() item, _ := r.metrics.Load(name)
defer r.mutex.Unlock() return item
return r.metrics[name]
} }
// Gets an existing metric or creates and registers a new one. Threadsafe // Gets an existing metric or creates and registers a new one. Threadsafe
@ -81,35 +76,48 @@ func (r *StandardRegistry) Get(name string) interface{} {
// The interface can be the metric to register if not found in registry, // The interface can be the metric to register if not found in registry,
// or a function returning the metric for lazy instantiation. // or a function returning the metric for lazy instantiation.
func (r *StandardRegistry) GetOrRegister(name string, i interface{}) interface{} { func (r *StandardRegistry) GetOrRegister(name string, i interface{}) interface{} {
r.mutex.Lock() // fast path
defer r.mutex.Unlock() cached, ok := r.metrics.Load(name)
if metric, ok := r.metrics[name]; ok { if ok {
return metric return cached
} }
if v := reflect.ValueOf(i); v.Kind() == reflect.Func { if v := reflect.ValueOf(i); v.Kind() == reflect.Func {
i = v.Call(nil)[0].Interface() i = v.Call(nil)[0].Interface()
} }
r.register(name, i) item, _, ok := r.loadOrRegister(name, i)
if !ok {
return i return i
} }
return item
}
// Register the given metric under the given name. Returns a DuplicateMetric // Register the given metric under the given name. Returns a DuplicateMetric
// if a metric by the given name is already registered. // if a metric by the given name is already registered.
func (r *StandardRegistry) Register(name string, i interface{}) error { func (r *StandardRegistry) Register(name string, i interface{}) error {
r.mutex.Lock() // fast path
defer r.mutex.Unlock() _, ok := r.metrics.Load(name)
return r.register(name, i) if ok {
return DuplicateMetric(name)
}
if v := reflect.ValueOf(i); v.Kind() == reflect.Func {
i = v.Call(nil)[0].Interface()
}
_, loaded, _ := r.loadOrRegister(name, i)
if loaded {
return DuplicateMetric(name)
}
return nil
} }
// Run all registered healthchecks. // Run all registered healthchecks.
func (r *StandardRegistry) RunHealthchecks() { func (r *StandardRegistry) RunHealthchecks() {
r.mutex.Lock() r.metrics.Range(func(key, value any) bool {
defer r.mutex.Unlock() if h, ok := value.(Healthcheck); ok {
for _, i := range r.metrics {
if h, ok := i.(Healthcheck); ok {
h.Check() h.Check()
} }
} return true
})
} }
// GetAll metrics in the Registry // GetAll metrics in the Registry
@ -177,45 +185,31 @@ func (r *StandardRegistry) GetAll() map[string]map[string]interface{} {
// Unregister the metric with the given name. // Unregister the metric with the given name.
func (r *StandardRegistry) Unregister(name string) { func (r *StandardRegistry) Unregister(name string) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.stop(name) r.stop(name)
delete(r.metrics, name) r.metrics.LoadAndDelete(name)
} }
// Unregister all metrics. (Mostly for testing.) func (r *StandardRegistry) loadOrRegister(name string, i interface{}) (interface{}, bool, bool) {
func (r *StandardRegistry) UnregisterAll() {
r.mutex.Lock()
defer r.mutex.Unlock()
for name := range r.metrics {
r.stop(name)
delete(r.metrics, name)
}
}
func (r *StandardRegistry) register(name string, i interface{}) error {
if _, ok := r.metrics[name]; ok {
return DuplicateMetric(name)
}
switch i.(type) { switch i.(type) {
case Counter, CounterFloat64, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer: case Counter, CounterFloat64, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer:
r.metrics[name] = i default:
return nil, false, false
} }
return nil item, loaded := r.metrics.LoadOrStore(name, i)
return item, loaded, true
} }
func (r *StandardRegistry) registered() map[string]interface{} { func (r *StandardRegistry) registered() map[string]interface{} {
r.mutex.Lock() metrics := make(map[string]interface{})
defer r.mutex.Unlock() r.metrics.Range(func(key, value any) bool {
metrics := make(map[string]interface{}, len(r.metrics)) metrics[key.(string)] = value
for name, i := range r.metrics { return true
metrics[name] = i })
}
return metrics return metrics
} }
func (r *StandardRegistry) stop(name string) { func (r *StandardRegistry) stop(name string) {
if i, ok := r.metrics[name]; ok { if i, ok := r.metrics.Load(name); ok {
if s, ok := i.(Stoppable); ok { if s, ok := i.(Stoppable); ok {
s.Stop() s.Stop()
} }
@ -308,11 +302,6 @@ func (r *PrefixedRegistry) Unregister(name string) {
r.underlying.Unregister(realName) r.underlying.Unregister(realName)
} }
// Unregister all metrics. (Mostly for testing.)
func (r *PrefixedRegistry) UnregisterAll() {
r.underlying.UnregisterAll()
}
var ( var (
DefaultRegistry = NewRegistry() DefaultRegistry = NewRegistry()
EphemeralRegistry = NewRegistry() EphemeralRegistry = NewRegistry()

@ -1,6 +1,7 @@
package metrics package metrics
import ( import (
"sync"
"testing" "testing"
) )
@ -13,6 +14,30 @@ func BenchmarkRegistry(b *testing.B) {
} }
} }
func BenchmarkRegistryGetOrRegisterParallel_8(b *testing.B) {
benchmarkRegistryGetOrRegisterParallel(b, 8)
}
func BenchmarkRegistryGetOrRegisterParallel_32(b *testing.B) {
benchmarkRegistryGetOrRegisterParallel(b, 32)
}
func benchmarkRegistryGetOrRegisterParallel(b *testing.B, amount int) {
r := NewRegistry()
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < amount; i++ {
wg.Add(1)
go func() {
for i := 0; i < b.N; i++ {
r.GetOrRegister("foo", NewMeter)
}
wg.Done()
}()
}
wg.Wait()
}
func TestRegistry(t *testing.T) { func TestRegistry(t *testing.T) {
r := NewRegistry() r := NewRegistry()
r.Register("foo", NewCounter()) r.Register("foo", NewCounter())