refactor metrics, add metric debug

This commit is contained in:
Felipe Andrade 2023-07-14 14:08:02 -07:00
parent b4f0ede95e
commit 668228c5c3
9 changed files with 106 additions and 47 deletions

@ -27,6 +27,7 @@ type SignerServiceConfig struct {
type MetricsConfig struct { type MetricsConfig struct {
Enabled bool `toml:"enabled"` Enabled bool `toml:"enabled"`
Debug bool `toml:"debug"`
Host string `toml:"host"` Host string `toml:"host"`
Port int `toml:"port"` Port int `toml:"port"`
} }
@ -54,9 +55,8 @@ type WalletConfig struct {
} }
type ProviderConfig struct { type ProviderConfig struct {
Disabled bool `toml:"disabled"` Network string `toml:"network"`
Network string `toml:"network"` URL string `toml:"url"`
URL string `toml:"url"`
ReadOnly bool `toml:"read_only"` ReadOnly bool `toml:"read_only"`
ReadInterval TOMLDuration `toml:"read_interval"` ReadInterval TOMLDuration `toml:"read_interval"`

@ -33,7 +33,7 @@ func (i *InstrumentedEthClient) TransactionByHash(ctx context.Context, hash comm
start := time.Now() start := time.Now()
tx, isPending, err := i.c.TransactionByHash(ctx, hash) tx, isPending, err := i.c.TransactionByHash(ctx, hash)
if err != nil { if err != nil {
if !i.IgnorableErrors(err) { if !i.ignorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.TransactionByHash") metrics.RecordError(i.providerName, "ethclient.TransactionByHash")
} }
return nil, false, err return nil, false, err
@ -57,7 +57,7 @@ func (i *InstrumentedEthClient) TransactionReceipt(ctx context.Context, txHash c
start := time.Now() start := time.Now()
receipt, err := i.c.TransactionReceipt(ctx, txHash) receipt, err := i.c.TransactionReceipt(ctx, txHash)
if err != nil { if err != nil {
if !i.IgnorableErrors(err) { if !i.ignorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.TransactionReceipt") metrics.RecordError(i.providerName, "ethclient.TransactionReceipt")
} }
return nil, err return nil, err
@ -70,7 +70,7 @@ func (i *InstrumentedEthClient) SendTransaction(ctx context.Context, tx *types.T
start := time.Now() start := time.Now()
err := i.c.SendTransaction(ctx, tx) err := i.c.SendTransaction(ctx, tx)
if err != nil { if err != nil {
if !i.IgnorableErrors(err) { if !i.ignorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.SendTransaction") metrics.RecordError(i.providerName, "ethclient.SendTransaction")
} }
return err return err
@ -79,7 +79,7 @@ func (i *InstrumentedEthClient) SendTransaction(ctx context.Context, tx *types.T
return err return err
} }
func (i *InstrumentedEthClient) IgnorableErrors(err error) bool { func (i *InstrumentedEthClient) ignorableErrors(err error) bool {
msg := err.Error() msg := err.Error()
// we dont use errors.Is because eth client actually uses errors.New, // we dont use errors.Is because eth client actually uses errors.New,
// therefore creating an incomparable instance :( // therefore creating an incomparable instance :(

@ -3,6 +3,7 @@ package metrics
import ( import (
"time" "time"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
) )
@ -12,6 +13,8 @@ const (
) )
var ( var (
Debug bool
errorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ errorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Name: "errors_total", Name: "errors_total",
@ -75,29 +78,57 @@ var (
) )
func RecordError(provider string, errorLabel string) { func RecordError(provider string, errorLabel string) {
if Debug {
log.Debug("metric inc", "m", "errors_total",
"provider", provider, "error", errorLabel)
}
errorsTotal.WithLabelValues(provider, errorLabel).Inc() errorsTotal.WithLabelValues(provider, errorLabel).Inc()
} }
func RecordRPCLatency(provider string, client string, method string, latency time.Duration) { func RecordRPCLatency(provider string, client string, method string, latency time.Duration) {
if Debug {
log.Debug("metric set", "m", "rpc_latency",
"provider", provider, "client", client, "method", method, "latency", latency)
}
rpcLatency.WithLabelValues(provider, client, method).Set(float64(latency.Milliseconds())) rpcLatency.WithLabelValues(provider, client, method).Set(float64(latency.Milliseconds()))
} }
func RecordRoundTripLatency(provider string, latency time.Duration) { func RecordRoundTripLatency(provider string, latency time.Duration) {
if Debug {
log.Debug("metric set", "m", "roundtrip_latency",
"provider", provider, "latency", latency)
}
roundTripLatency.WithLabelValues(provider).Set(float64(latency.Milliseconds())) roundTripLatency.WithLabelValues(provider).Set(float64(latency.Milliseconds()))
} }
func RecordGasUsed(provider string, val uint64) { func RecordGasUsed(provider string, val uint64) {
gasUsed.WithLabelValues(provider).Set(float64(val)) if Debug {
log.Debug("metric add", "m", "gas_used",
"provider", provider, "val", val)
}
gasUsed.WithLabelValues(provider).Add(float64(val))
} }
func RecordFirstSeenLatency(provider_source string, provider_seen string, latency time.Duration) { func RecordFirstSeenLatency(providerSource string, providerSeen string, latency time.Duration) {
firstSeenLatency.WithLabelValues(provider_source, provider_seen).Set(float64(latency.Milliseconds())) if Debug {
log.Debug("metric set", "m", "first_seen_latency",
"provider_source", providerSource, "provider_seen", providerSeen, "latency", latency)
}
firstSeenLatency.WithLabelValues(providerSource, providerSeen).Set(float64(latency.Milliseconds()))
} }
func RecordProviderToProviderLatency(provider_source string, provider_seen string, latency time.Duration) { func RecordProviderToProviderLatency(providerSource string, providerSeen string, latency time.Duration) {
firstSeenLatency.WithLabelValues(provider_source, provider_seen).Set(float64(latency.Milliseconds())) if Debug {
log.Debug("metric set", "m", "provider_to_provider_latency",
"provider_source", providerSource, "provider_seen", providerSeen, "latency", latency)
}
providerToProviderLatency.WithLabelValues(providerSource, providerSeen).Set(float64(latency.Milliseconds()))
} }
func RecordTransactionsInFlight(network string, count int) { func RecordTransactionsInFlight(network string, count int) {
if Debug {
log.Debug("metric set", "m", "transactions_inflight",
"network", network, "count", count)
}
networkTransactionsInFlight.WithLabelValues(network).Set(float64(count)) networkTransactionsInFlight.WithLabelValues(network).Set(float64(count))
} }

@ -55,19 +55,19 @@ func (p *Provider) Heartbeat(ctx context.Context) {
// mark transaction as seen by this provider // mark transaction as seen by this provider
st.M.Lock() st.M.Lock()
latency := time.Since(st.SentAt)
if st.FirstSeen.IsZero() { if st.FirstSeen.IsZero() {
st.FirstSeen = time.Now() st.FirstSeen = time.Now()
firstSeenLatency := time.Since(st.SentAt) metrics.RecordFirstSeenLatency(st.ProviderSentTo, p.name, latency)
metrics.RecordFirstSeenLatency(st.ProviderSentTo, p.name, time.Since(st.SentAt))
log.Info("transaction first seen", log.Info("transaction first seen",
"hash", hash, "hash", hash,
"firstSeenLatency", firstSeenLatency, "firstSeenLatency", latency,
"provider_source", st.ProviderSentTo, "providerSource", st.ProviderSentTo,
"provider_seen", p.name) "providerSeen", p.name)
} }
if _, exist := st.SeenBy[p.name]; !exist { if _, exist := st.SeenBy[p.name]; !exist {
st.SeenBy[p.name] = time.Now() st.SeenBy[p.name] = time.Now()
metrics.RecordProviderToProviderLatency(st.ProviderSentTo, p.name, time.Since(st.SentAt)) metrics.RecordProviderToProviderLatency(st.ProviderSentTo, p.name, latency)
} }
st.M.Unlock() st.M.Unlock()

@ -2,7 +2,6 @@ package provider
import ( import (
"context" "context"
"net/http"
"op-ufm/pkg/config" "op-ufm/pkg/config"
"time" "time"
) )
@ -13,9 +12,8 @@ type Provider struct {
signerConfig *config.SignerServiceConfig signerConfig *config.SignerServiceConfig
walletConfig *config.WalletConfig walletConfig *config.WalletConfig
txPool *NetworkTransactionPool txPool *NetworkTransactionPool
cancelFunc context.CancelFunc
client *http.Client cancelFunc context.CancelFunc
} }
func New(name string, cfg *config.ProviderConfig, func New(name string, cfg *config.ProviderConfig,
@ -28,8 +26,6 @@ func New(name string, cfg *config.ProviderConfig,
signerConfig: signerConfig, signerConfig: signerConfig,
walletConfig: walletConfig, walletConfig: walletConfig,
txPool: txPool, txPool: txPool,
client: http.DefaultClient,
} }
return p return p
} }
@ -37,6 +33,7 @@ func New(name string, cfg *config.ProviderConfig,
func (p *Provider) Start(ctx context.Context) { func (p *Provider) Start(ctx context.Context) {
providerCtx, cancelFunc := context.WithCancel(ctx) providerCtx, cancelFunc := context.WithCancel(ctx)
p.cancelFunc = cancelFunc p.cancelFunc = cancelFunc
schedule(providerCtx, time.Duration(p.config.ReadInterval), p.Heartbeat) schedule(providerCtx, time.Duration(p.config.ReadInterval), p.Heartbeat)
if !p.config.ReadOnly { if !p.config.ReadOnly {
schedule(providerCtx, time.Duration(p.config.SendInterval), p.RoundTrip) schedule(providerCtx, time.Duration(p.config.SendInterval), p.RoundTrip)

@ -20,7 +20,7 @@ import (
// RoundTrip send a new transaction to measure round trip latency // RoundTrip send a new transaction to measure round trip latency
func (p *Provider) RoundTrip(ctx context.Context) { func (p *Provider) RoundTrip(ctx context.Context) {
log.Debug("roundtrip", "provider", p.name) log.Debug("roundTripLatency", "provider", p.name)
client, err := iclients.Dial(p.name, p.config.URL) client, err := iclients.Dial(p.name, p.config.URL)
if err != nil { if err != nil {
@ -36,7 +36,10 @@ func (p *Provider) RoundTrip(ctx context.Context) {
txHash := common.Hash{} txHash := common.Hash{}
attempt := 0 attempt := 0
startedAt := time.Now() // used for timeout
firstAttemptAt := time.Now()
// used for actual round trip time (disregard retry time)
roundTripStartedAt := time.Now()
for { for {
tx := p.createTx(nonce) tx := p.createTx(nonce)
txHash = tx.Hash() txHash = tx.Hash()
@ -49,11 +52,12 @@ func (p *Provider) RoundTrip(ctx context.Context) {
txHash = signedTx.Hash() txHash = signedTx.Hash()
roundTripStartedAt = time.Now()
err = client.SendTransaction(ctx, signedTx) err = client.SendTransaction(ctx, signedTx)
if err != nil { if err != nil {
if err.Error() == txpool.ErrAlreadyKnown.Error() || err.Error() == core.ErrNonceTooLow.Error() { if err.Error() == txpool.ErrAlreadyKnown.Error() || err.Error() == core.ErrNonceTooLow.Error() {
if time.Since(startedAt) >= time.Duration(p.config.SendTransactionRetryTimeout) { if time.Since(firstAttemptAt) >= time.Duration(p.config.SendTransactionRetryTimeout) {
log.Error("send transaction timed out (known already)", "provider", p.name, "hash", txHash.Hex(), "elapsed", time.Since(startedAt), "attempt", attempt, "nonce", nonce) log.Error("send transaction timed out (known already)", "provider", p.name, "hash", txHash.Hex(), "elapsed", time.Since(firstAttemptAt), "attempt", attempt, "nonce", nonce)
metrics.RecordError(p.name, "ethclient.SendTransaction.nonce") metrics.RecordError(p.name, "ethclient.SendTransaction.nonce")
return return
} }
@ -62,7 +66,7 @@ func (p *Provider) RoundTrip(ctx context.Context) {
nonce++ nonce++
attempt++ attempt++
if attempt%10 == 0 { if attempt%10 == 0 {
log.Debug("retrying send transaction...", "provider", p.name, "attempt", attempt, "nonce", nonce, "elapsed", time.Since(startedAt)) log.Debug("retrying send transaction...", "provider", p.name, "attempt", attempt, "nonce", nonce, "elapsed", time.Since(firstAttemptAt))
} }
} else { } else {
log.Error("cant send transaction", "provider", p.name, "err", err) log.Error("cant send transaction", "provider", p.name, "err", err)
@ -104,13 +108,14 @@ func (p *Provider) RoundTrip(ctx context.Context) {
} }
attempt++ attempt++
} }
roundtrip := time.Since(sentAt)
metrics.RecordRoundTripLatency(p.name, roundtrip)
roundTripLatency := time.Since(roundTripStartedAt)
metrics.RecordRoundTripLatency(p.name, roundTripLatency)
metrics.RecordGasUsed(p.name, receipt.GasUsed) metrics.RecordGasUsed(p.name, receipt.GasUsed)
log.Info("got transaction receipt", "hash", txHash.Hex(), log.Info("got transaction receipt", "hash", txHash.Hex(),
"roundtrip", roundtrip, "roundTripLatency", roundTripLatency,
"provider", p.name, "provider", p.name,
"blockNumber", receipt.BlockNumber, "blockNumber", receipt.BlockNumber,
"blockHash", receipt.BlockHash, "blockHash", receipt.BlockHash,

@ -9,12 +9,12 @@ import (
"github.com/rs/cors" "github.com/rs/cors"
) )
type Healthz struct { type HealthzServer struct {
ctx context.Context ctx context.Context
server *http.Server server *http.Server
} }
func (h *Healthz) Start(ctx context.Context, host string, port int) error { func (h *HealthzServer) Start(ctx context.Context, host string, port int) error {
hdlr := mux.NewRouter() hdlr := mux.NewRouter()
hdlr.HandleFunc("/healthz", h.Handle).Methods("GET") hdlr.HandleFunc("/healthz", h.Handle).Methods("GET")
addr := fmt.Sprintf("%s:%d", host, port) addr := fmt.Sprintf("%s:%d", host, port)
@ -30,10 +30,10 @@ func (h *Healthz) Start(ctx context.Context, host string, port int) error {
return h.server.ListenAndServe() return h.server.ListenAndServe()
} }
func (h *Healthz) Shutdown() error { func (h *HealthzServer) Shutdown() error {
return h.server.Shutdown(h.ctx) return h.server.Shutdown(h.ctx)
} }
func (h *Healthz) Handle(w http.ResponseWriter, r *http.Request) { func (h *HealthzServer) Handle(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK")) w.Write([]byte("OK"))
} }

@ -0,0 +1,27 @@
package service
import (
"context"
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type MetricsServer struct {
ctx context.Context
server *http.Server
}
func (m *MetricsServer) Start(ctx context.Context, addr string) error {
server := &http.Server{
Handler: promhttp.Handler(),
Addr: addr,
}
m.server = server
m.ctx = ctx
return m.server.ListenAndServe()
}
func (m *MetricsServer) Shutdown() error {
return m.server.Shutdown(m.ctx)
}

@ -3,24 +3,25 @@ package service
import ( import (
"context" "context"
"fmt" "fmt"
"net/http"
"op-ufm/pkg/config" "op-ufm/pkg/config"
"op-ufm/pkg/metrics"
"op-ufm/pkg/provider" "op-ufm/pkg/provider"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
) )
type Service struct { type Service struct {
Config *config.Config Config *config.Config
Healthz *Healthz Healthz *HealthzServer
Metrics *MetricsServer
Providers map[string]*provider.Provider Providers map[string]*provider.Provider
} }
func New(cfg *config.Config) *Service { func New(cfg *config.Config) *Service {
s := &Service{ s := &Service{
Config: cfg, Config: cfg,
Healthz: &Healthz{}, Healthz: &HealthzServer{},
Metrics: &MetricsServer{},
Providers: make(map[string]*provider.Provider, len(cfg.Providers)), Providers: make(map[string]*provider.Provider, len(cfg.Providers)),
} }
return s return s
@ -38,11 +39,12 @@ func (s *Service) Start(ctx context.Context) {
}() }()
} }
metrics.Debug = s.Config.Metrics.Debug
if s.Config.Metrics.Enabled { if s.Config.Metrics.Enabled {
addr := fmt.Sprintf("%s:%d", s.Config.Metrics.Host, s.Config.Metrics.Port) addr := fmt.Sprintf("%s:%d", s.Config.Metrics.Host, s.Config.Metrics.Port)
log.Info("starting metrics server", "addr", addr) log.Info("starting metrics server", "addr", addr)
go func() { go func() {
if err := http.ListenAndServe(addr, promhttp.Handler()); err != nil { if err := s.Metrics.Start(ctx, addr); err != nil {
log.Error("error starting metrics server", "err", err) log.Error("error starting metrics server", "err", err)
} }
}() }()
@ -51,9 +53,6 @@ func (s *Service) Start(ctx context.Context) {
// map networks to its providers // map networks to its providers
networks := make(map[string][]string) networks := make(map[string][]string)
for name, providerConfig := range s.Config.Providers { for name, providerConfig := range s.Config.Providers {
if providerConfig.Disabled {
continue
}
networks[providerConfig.Network] = append(networks[providerConfig.Network], name) networks[providerConfig.Network] = append(networks[providerConfig.Network], name)
} }
@ -70,10 +69,6 @@ func (s *Service) Start(ctx context.Context) {
} }
for name, providerConfig := range s.Config.Providers { for name, providerConfig := range s.Config.Providers {
if providerConfig.Disabled {
log.Info("provider is disabled", "provider", name)
continue
}
s.Providers[name] = provider.New(name, s.Providers[name] = provider.New(name,
providerConfig, providerConfig,
&s.Config.Signer, &s.Config.Signer,
@ -92,6 +87,10 @@ func (s *Service) Shutdown() {
s.Healthz.Shutdown() s.Healthz.Shutdown()
log.Info("healthz stopped") log.Info("healthz stopped")
} }
if s.Config.Metrics.Enabled {
s.Metrics.Shutdown()
log.Info("metrics stopped")
}
for name, provider := range s.Providers { for name, provider := range s.Providers {
provider.Shutdown() provider.Shutdown()
log.Info("provider stopped", "provider", name) log.Info("provider stopped", "provider", name)