Merge pull request #7079 from ethereum-optimism/felipe/track-nonce-locally

feat(ufm): track nonce locally
This commit is contained in:
OptimismBot 2023-08-31 15:48:50 -07:00 committed by GitHub
commit c5054ae0c6
6 changed files with 161 additions and 53 deletions

@ -28,7 +28,10 @@ func main() {
), ),
) )
log.Info("initializing", "version", GitVersion, "commit", GitCommit, "date", GitDate) log.Info("initializing",
"version", GitVersion,
"commit", GitCommit,
"date", GitDate)
if len(os.Args) < 2 { if len(os.Args) < 2 {
log.Crit("must specify a config file on the command line") log.Crit("must specify a config file on the command line")
@ -42,7 +45,8 @@ func main() {
sig := make(chan os.Signal, 1) sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
recvSig := <-sig recvSig := <-sig
log.Info("caught signal, shutting down", "signal", recvSig) log.Info("caught signal, shutting down",
"signal", recvSig)
svc.Shutdown() svc.Shutdown()
} }
@ -50,7 +54,9 @@ func main() {
func initConfig(cfgFile string) *config.Config { func initConfig(cfgFile string) *config.Config {
cfg, err := config.New(cfgFile) cfg, err := config.New(cfgFile)
if err != nil { if err != nil {
log.Crit("error reading config file", "file", cfgFile, "err", err) log.Crit("error reading config file",
"file", cfgFile,
"err", err)
} }
// update log level from config // update log level from config
@ -58,7 +64,8 @@ func initConfig(cfgFile string) *config.Config {
if err != nil { if err != nil {
logLevel = log.LvlInfo logLevel = log.LvlInfo
if cfg.LogLevel != "" { if cfg.LogLevel != "" {
log.Warn("invalid server.log_level set: " + cfg.LogLevel) log.Warn("invalid server.log_level",
"log_level", cfg.LogLevel)
} }
} }
log.Root().SetHandler( log.Root().SetHandler(
@ -74,7 +81,8 @@ func initConfig(cfgFile string) *config.Config {
err = cfg.Validate() err = cfg.Validate()
if err != nil { if err != nil {
log.Crit("invalid config", "err", err) log.Crit("invalid config",
"err", err)
} }
return cfg return cfg

@ -84,8 +84,10 @@ var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z ]+`)
func RecordError(provider string, errorLabel string) { func RecordError(provider string, errorLabel string) {
if Debug { if Debug {
log.Debug("metric inc", "m", "errors_total", log.Debug("metric inc",
"provider", provider, "error", errorLabel) "m", "errors_total",
"provider", provider,
"error", errorLabel)
} }
errorsTotal.WithLabelValues(provider, errorLabel).Inc() errorsTotal.WithLabelValues(provider, errorLabel).Inc()
} }
@ -101,48 +103,64 @@ func RecordErrorDetails(provider string, label string, err error) {
func RecordRPCLatency(provider string, client string, method string, latency time.Duration) { func RecordRPCLatency(provider string, client string, method string, latency time.Duration) {
if Debug { if Debug {
log.Debug("metric set", "m", "rpc_latency", log.Debug("metric set",
"provider", provider, "client", client, "method", method, "latency", latency) "m", "rpc_latency",
"provider", provider,
"client", client,
"method", method,
"latency", latency)
} }
rpcLatency.WithLabelValues(provider, client, method).Set(float64(latency.Milliseconds())) rpcLatency.WithLabelValues(provider, client, method).Set(float64(latency.Milliseconds()))
} }
func RecordRoundTripLatency(provider string, latency time.Duration) { func RecordRoundTripLatency(provider string, latency time.Duration) {
if Debug { if Debug {
log.Debug("metric set", "m", "roundtrip_latency", log.Debug("metric set",
"provider", provider, "latency", latency) "m", "roundtrip_latency",
"provider", provider,
"latency", latency)
} }
roundTripLatency.WithLabelValues(provider).Set(float64(latency.Milliseconds())) roundTripLatency.WithLabelValues(provider).Set(float64(latency.Milliseconds()))
} }
func RecordGasUsed(provider string, val uint64) { func RecordGasUsed(provider string, val uint64) {
if Debug { if Debug {
log.Debug("metric add", "m", "gas_used", log.Debug("metric add",
"provider", provider, "val", val) "m", "gas_used",
"provider", provider,
"val", val)
} }
gasUsed.WithLabelValues(provider).Set(float64(val)) gasUsed.WithLabelValues(provider).Set(float64(val))
} }
func RecordFirstSeenLatency(providerSource string, providerSeen string, latency time.Duration) { func RecordFirstSeenLatency(providerSource string, providerSeen string, latency time.Duration) {
if Debug { if Debug {
log.Debug("metric set", "m", "first_seen_latency", log.Debug("metric set",
"provider_source", providerSource, "provider_seen", providerSeen, "latency", latency) "m", "first_seen_latency",
"provider_source", providerSource,
"provider_seen", providerSeen,
"latency", latency)
} }
firstSeenLatency.WithLabelValues(providerSource, providerSeen).Set(float64(latency.Milliseconds())) firstSeenLatency.WithLabelValues(providerSource, providerSeen).Set(float64(latency.Milliseconds()))
} }
func RecordProviderToProviderLatency(providerSource string, providerSeen string, latency time.Duration) { func RecordProviderToProviderLatency(providerSource string, providerSeen string, latency time.Duration) {
if Debug { if Debug {
log.Debug("metric set", "m", "provider_to_provider_latency", log.Debug("metric set",
"provider_source", providerSource, "provider_seen", providerSeen, "latency", latency) "m", "provider_to_provider_latency",
"provider_source", providerSource,
"provider_seen", providerSeen,
"latency", latency)
} }
providerToProviderLatency.WithLabelValues(providerSource, providerSeen).Set(float64(latency.Milliseconds())) providerToProviderLatency.WithLabelValues(providerSource, providerSeen).Set(float64(latency.Milliseconds()))
} }
func RecordTransactionsInFlight(network string, count int) { func RecordTransactionsInFlight(network string, count int) {
if Debug { if Debug {
log.Debug("metric set", "m", "transactions_inflight", log.Debug("metric set",
"network", network, "count", count) "m", "transactions_inflight",
"network", network,
"count", count)
} }
networkTransactionsInFlight.WithLabelValues(network).Set(float64(count)) networkTransactionsInFlight.WithLabelValues(network).Set(float64(count))
} }

@ -14,7 +14,9 @@ import (
// Heartbeat polls for expected in-flight transactions // Heartbeat polls for expected in-flight transactions
func (p *Provider) Heartbeat(ctx context.Context) { func (p *Provider) Heartbeat(ctx context.Context) {
log.Debug("heartbeat", "provider", p.name, "inflight", len(p.txPool.Transactions)) log.Debug("heartbeat",
"provider", p.name,
"count", len(p.txPool.Transactions))
metrics.RecordTransactionsInFlight(p.config.Network, len(p.txPool.Transactions)) metrics.RecordTransactionsInFlight(p.config.Network, len(p.txPool.Transactions))
@ -33,26 +35,42 @@ func (p *Provider) Heartbeat(ctx context.Context) {
} }
if len(expectedTransactions) == 0 { if len(expectedTransactions) == 0 {
log.Debug("no expected txs", "count", len(p.txPool.Transactions), "provider", p.name, "alreadySeen", alreadySeen) log.Debug("no expected txs",
"count", len(p.txPool.Transactions),
"provider", p.name,
"alreadySeen", alreadySeen)
return return
} }
client, err := clients.Dial(p.name, p.config.URL) client, err := clients.Dial(p.name, p.config.URL)
if err != nil { if err != nil {
log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err) log.Error("cant dial to provider",
"provider", p.name,
"url", p.config.URL,
"err", err)
} }
log.Debug("checking in-flight tx", "count", len(p.txPool.Transactions), "provider", p.name, "alreadySeen", alreadySeen) log.Debug("checking in-flight tx",
"count", len(p.txPool.Transactions),
"provider", p.name,
"alreadySeen", alreadySeen)
for _, st := range expectedTransactions { for _, st := range expectedTransactions {
hash := st.Hash.Hex() hash := st.Hash.Hex()
_, isPending, err := client.TransactionByHash(ctx, st.Hash) _, isPending, err := client.TransactionByHash(ctx, st.Hash)
if err != nil && !errors.Is(err, ethereum.NotFound) { if err != nil && !errors.Is(err, ethereum.NotFound) {
log.Error("cant check transaction", "provider", p.name, "hash", hash, "url", p.config.URL, "err", err) log.Error("cant check transaction",
"provider", p.name,
"hash", hash,
"url", p.config.URL,
"err", err)
continue continue
} }
log.Debug("got transaction", "provider", p.name, "hash", hash, "isPending", isPending) log.Debug("got transaction",
"provider", p.name,
"hash", hash,
"isPending", isPending)
// mark transaction as seen by this provider // mark transaction as seen by this provider
st.M.Lock() st.M.Lock()
@ -75,7 +93,10 @@ func (p *Provider) Heartbeat(ctx context.Context) {
// check if transaction have been seen by all providers // check if transaction have been seen by all providers
p.txPool.M.Lock() p.txPool.M.Lock()
if len(st.SeenBy) == p.txPool.Expected { if len(st.SeenBy) == p.txPool.Expected {
log.Debug("transaction seen by all", "hash", hash, "expected", p.txPool.Expected, "seenBy", len(st.SeenBy)) log.Debug("transaction seen by all",
"hash", hash,
"expected", p.txPool.Expected,
"seenBy", len(st.SeenBy))
delete(p.txPool.Transactions, st.Hash.Hex()) delete(p.txPool.Transactions, st.Hash.Hex())
} }
p.txPool.M.Unlock() p.txPool.M.Unlock()

@ -6,11 +6,11 @@ import (
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics" "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics"
iclients "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics/clients" iclients "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics/clients"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum-optimism/optimism/op-service/tls" "github.com/ethereum-optimism/optimism/op-service/tls"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -21,19 +21,35 @@ import (
// RoundTrip send a new transaction to measure round trip latency // RoundTrip send a new transaction to measure round trip latency
func (p *Provider) RoundTrip(ctx context.Context) { func (p *Provider) RoundTrip(ctx context.Context) {
log.Debug("roundTripLatency", "provider", p.name) log.Debug("roundTripLatency",
"provider", p.name)
client, err := iclients.Dial(p.name, p.config.URL) client, err := iclients.Dial(p.name, p.config.URL)
if err != nil { if err != nil {
log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err) log.Error("cant dial to provider",
"provider", p.name,
"url", p.config.URL,
"err", err)
return return
} }
nonce, err := client.PendingNonceAt(ctx, p.walletConfig.Address) var nonce uint64
if err != nil { p.txPool.M.Lock()
log.Error("cant get nounce", "provider", p.name, "err", err) if p.txPool.Nonce == uint64(0) {
return nonce, err = client.PendingNonceAt(ctx, p.walletConfig.Address)
if err != nil {
log.Error("cant get nounce",
"provider", p.name,
"err", err)
p.txPool.M.Unlock()
return
}
p.txPool.Nonce = nonce
} else {
p.txPool.Nonce++
nonce = p.txPool.Nonce
} }
p.txPool.M.Unlock()
txHash := common.Hash{} txHash := common.Hash{}
attempt := 0 attempt := 0
@ -47,7 +63,10 @@ func (p *Provider) RoundTrip(ctx context.Context) {
signedTx, err := p.sign(ctx, tx) signedTx, err := p.sign(ctx, tx)
if err != nil { if err != nil {
log.Error("cant sign tx", "provider", p.name, "tx", tx, "err", err) log.Error("cant sign tx",
"provider", p.name,
"tx", tx,
"err", err)
return return
} }
@ -56,21 +75,40 @@ func (p *Provider) RoundTrip(ctx context.Context) {
roundTripStartedAt = time.Now() roundTripStartedAt = time.Now()
err = client.SendTransaction(ctx, signedTx) err = client.SendTransaction(ctx, signedTx)
if err != nil { if err != nil {
if err.Error() == txpool.ErrAlreadyKnown.Error() || err.Error() == core.ErrNonceTooLow.Error() { if err.Error() == txpool.ErrAlreadyKnown.Error() ||
err.Error() == txpool.ErrReplaceUnderpriced.Error() ||
err.Error() == core.ErrNonceTooLow.Error() {
if time.Since(firstAttemptAt) >= time.Duration(p.config.SendTransactionRetryTimeout) { if time.Since(firstAttemptAt) >= time.Duration(p.config.SendTransactionRetryTimeout) {
log.Error("send transaction timed out (known already)", "provider", p.name, "hash", txHash.Hex(), "elapsed", time.Since(firstAttemptAt), "attempt", attempt, "nonce", nonce) log.Error("send transaction timed out (known already)",
"provider", p.name,
"hash", txHash.Hex(),
"elapsed", time.Since(firstAttemptAt),
"attempt", attempt,
"nonce", nonce)
metrics.RecordError(p.name, "ethclient.SendTransaction.nonce") metrics.RecordError(p.name, "ethclient.SendTransaction.nonce")
return return
} }
log.Warn("tx already known, incrementing nonce and trying again", "provider", p.name, "nonce", nonce) log.Warn("tx already known, incrementing nonce and trying again",
"provider", p.name,
"nonce", nonce)
time.Sleep(time.Duration(p.config.SendTransactionRetryInterval)) time.Sleep(time.Duration(p.config.SendTransactionRetryInterval))
nonce++
p.txPool.M.Lock()
p.txPool.Nonce++
nonce = p.txPool.Nonce
p.txPool.M.Unlock()
attempt++ attempt++
if attempt%10 == 0 { if attempt%10 == 0 {
log.Debug("retrying send transaction...", "provider", p.name, "attempt", attempt, "nonce", nonce, "elapsed", time.Since(firstAttemptAt)) log.Debug("retrying send transaction...",
"provider", p.name,
"attempt", attempt,
"nonce", nonce,
"elapsed", time.Since(firstAttemptAt))
} }
} else { } else {
log.Error("cant send transaction", "provider", p.name, "err", err) log.Error("cant send transaction",
"provider", p.name,
"err", err)
metrics.RecordErrorDetails(p.name, "ethclient.SendTransaction", err) metrics.RecordErrorDetails(p.name, "ethclient.SendTransaction", err)
return return
} }
@ -79,7 +117,10 @@ func (p *Provider) RoundTrip(ctx context.Context) {
} }
} }
log.Info("transaction sent", "provider", p.name, "hash", txHash.Hex(), "nonce", nonce) log.Info("transaction sent",
"provider", p.name,
"hash", txHash.Hex(),
"nonce", nonce)
// add to pool // add to pool
sentAt := time.Now() sentAt := time.Now()
@ -96,16 +137,25 @@ func (p *Provider) RoundTrip(ctx context.Context) {
attempt = 0 attempt = 0
for receipt == nil { for receipt == nil {
if time.Since(sentAt) >= time.Duration(p.config.ReceiptRetrievalTimeout) { if time.Since(sentAt) >= time.Duration(p.config.ReceiptRetrievalTimeout) {
log.Error("receipt retrieval timed out", "provider", p.name, "hash", "elapsed", time.Since(sentAt)) log.Error("receipt retrieval timed out",
"provider", p.name,
"hash", txHash,
"elapsed", time.Since(sentAt))
return return
} }
time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval)) time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval))
if attempt%10 == 0 { if attempt%10 == 0 {
log.Debug("checking for receipt...", "provider", p.name, "attempt", attempt, "elapsed", time.Since(sentAt)) log.Debug("checking for receipt...",
"provider", p.name,
"attempt", attempt,
"elapsed", time.Since(sentAt))
} }
receipt, err = client.TransactionReceipt(ctx, txHash) receipt, err = client.TransactionReceipt(ctx, txHash)
if err != nil && !errors.Is(err, ethereum.NotFound) { if err != nil && !errors.Is(err, ethereum.NotFound) {
log.Error("cant get receipt for transaction", "provider", p.name, "hash", txHash.Hex(), "err", err) log.Error("cant get receipt for transaction",
"provider", p.name,
"hash", txHash.Hex(),
"err", err)
return return
} }
attempt++ attempt++
@ -116,7 +166,8 @@ func (p *Provider) RoundTrip(ctx context.Context) {
metrics.RecordRoundTripLatency(p.name, roundTripLatency) metrics.RecordRoundTripLatency(p.name, roundTripLatency)
metrics.RecordGasUsed(p.name, receipt.GasUsed) metrics.RecordGasUsed(p.name, receipt.GasUsed)
log.Info("got transaction receipt", "hash", txHash.Hex(), log.Info("got transaction receipt",
"hash", txHash.Hex(),
"roundTripLatency", roundTripLatency, "roundTripLatency", roundTripLatency,
"provider", p.name, "provider", p.name,
"blockNumber", receipt.BlockNumber, "blockNumber", receipt.BlockNumber,
@ -155,7 +206,9 @@ func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Tran
TLSKey: p.signerConfig.TLSKey, TLSKey: p.signerConfig.TLSKey,
} }
client, err := iclients.NewSignerClient(p.name, log.Root(), p.signerConfig.URL, tlsConfig) client, err := iclients.NewSignerClient(p.name, log.Root(), p.signerConfig.URL, tlsConfig)
log.Debug("signerclient", "client", client, "err", err) log.Debug("signerclient",
"client", client,
"err", err)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -15,6 +15,7 @@ type NetworkTransactionPool struct {
M sync.Mutex M sync.Mutex
Transactions map[string]*TransactionState Transactions map[string]*TransactionState
Expected int Expected int
Nonce uint64
} }
type TransactionState struct { type TransactionState struct {

@ -32,10 +32,12 @@ func (s *Service) Start(ctx context.Context) {
log.Info("service starting") log.Info("service starting")
if s.Config.Healthz.Enabled { if s.Config.Healthz.Enabled {
addr := net.JoinHostPort(s.Config.Healthz.Host, s.Config.Healthz.Port) addr := net.JoinHostPort(s.Config.Healthz.Host, s.Config.Healthz.Port)
log.Info("starting healthz server", "addr", addr) log.Info("starting healthz server",
"addr", addr)
go func() { go func() {
if err := s.Healthz.Start(ctx, addr); err != nil { if err := s.Healthz.Start(ctx, addr); err != nil {
log.Error("error starting healthz server", "err", err) log.Error("error starting healthz server",
"err", err)
} }
}() }()
} }
@ -43,10 +45,12 @@ func (s *Service) Start(ctx context.Context) {
metrics.Debug = s.Config.Metrics.Debug metrics.Debug = s.Config.Metrics.Debug
if s.Config.Metrics.Enabled { if s.Config.Metrics.Enabled {
addr := net.JoinHostPort(s.Config.Metrics.Host, s.Config.Metrics.Port) addr := net.JoinHostPort(s.Config.Metrics.Host, s.Config.Metrics.Port)
log.Info("starting metrics server", "addr", addr) log.Info("starting metrics server",
"addr", addr)
go func() { go func() {
if err := s.Metrics.Start(ctx, addr); err != nil { if err := s.Metrics.Start(ctx, addr); err != nil {
log.Error("error starting metrics server", "err", err) log.Error("error starting metrics server",
"err", err)
} }
}() }()
} }
@ -60,7 +64,8 @@ func (s *Service) Start(ctx context.Context) {
txpool := &provider.TransactionPool{} txpool := &provider.TransactionPool{}
for name, providers := range networks { for name, providers := range networks {
if len(providers) == 1 { if len(providers) == 1 {
log.Warn("can't measure first seen for network, please another provider", "network", name) log.Warn("can't measure first seen for network, please another provider",
"network", name)
} }
(*txpool)[name] = &provider.NetworkTransactionPool{} (*txpool)[name] = &provider.NetworkTransactionPool{}
(*txpool)[name].Transactions = make(map[string]*provider.TransactionState) (*txpool)[name].Transactions = make(map[string]*provider.TransactionState)
@ -76,7 +81,8 @@ func (s *Service) Start(ctx context.Context) {
s.Config.Wallets[providerConfig.Wallet], s.Config.Wallets[providerConfig.Wallet],
(*txpool)[providerConfig.Network]) (*txpool)[providerConfig.Network])
s.Providers[name].Start(ctx) s.Providers[name].Start(ctx)
log.Info("provider started", "provider", name) log.Info("provider started",
"provider", name)
} }
log.Info("service started") log.Info("service started")
@ -94,7 +100,8 @@ func (s *Service) Shutdown() {
} }
for name, provider := range s.Providers { for name, provider := range s.Providers {
provider.Shutdown() provider.Shutdown()
log.Info("provider stopped", "provider", name) log.Info("provider stopped",
"provider", name)
} }
log.Info("service stopped") log.Info("service stopped")
} }