From 3e42245fa4364e95ee2fd40549903207d753dce8 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 31 Aug 2023 11:55:32 -0700 Subject: [PATCH 1/2] feat(ufm): track nonce locally --- op-ufm/op-ufm/pkg/provider/roundtrip.go | 30 +++++++++++++++++++------ op-ufm/op-ufm/pkg/provider/tx_pool.go | 1 + 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/op-ufm/op-ufm/pkg/provider/roundtrip.go b/op-ufm/op-ufm/pkg/provider/roundtrip.go index 4242d45..65dc33a 100644 --- a/op-ufm/op-ufm/pkg/provider/roundtrip.go +++ b/op-ufm/op-ufm/pkg/provider/roundtrip.go @@ -6,11 +6,11 @@ import ( "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics" iclients "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics/clients" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum-optimism/optimism/op-service/tls" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/crypto" "github.com/pkg/errors" @@ -29,11 +29,21 @@ func (p *Provider) RoundTrip(ctx context.Context) { return } - nonce, err := client.PendingNonceAt(ctx, p.walletConfig.Address) - if err != nil { - log.Error("cant get nounce", "provider", p.name, "err", err) - return + var nonce uint64 + p.txPool.M.Lock() + if p.txPool.Nonce == uint64(0) { + nonce, err = client.PendingNonceAt(ctx, p.walletConfig.Address) + if err != nil { + log.Error("cant get nounce", "provider", p.name, "err", err) + p.txPool.M.Unlock() + return + } + p.txPool.Nonce = nonce + } else { + p.txPool.Nonce++ + nonce = p.txPool.Nonce } + p.txPool.M.Unlock() txHash := common.Hash{} attempt := 0 @@ -56,7 +66,9 @@ func (p *Provider) RoundTrip(ctx context.Context) { roundTripStartedAt = time.Now() err = client.SendTransaction(ctx, signedTx) if err != nil { - if err.Error() == txpool.ErrAlreadyKnown.Error() || err.Error() == core.ErrNonceTooLow.Error() { + if err.Error() == txpool.ErrAlreadyKnown.Error() || + err.Error() == txpool.ErrReplaceUnderpriced.Error() || + err.Error() == core.ErrNonceTooLow.Error() { if time.Since(firstAttemptAt) >= time.Duration(p.config.SendTransactionRetryTimeout) { log.Error("send transaction timed out (known already)", "provider", p.name, "hash", txHash.Hex(), "elapsed", time.Since(firstAttemptAt), "attempt", attempt, "nonce", nonce) metrics.RecordError(p.name, "ethclient.SendTransaction.nonce") @@ -64,7 +76,11 @@ func (p *Provider) RoundTrip(ctx context.Context) { } log.Warn("tx already known, incrementing nonce and trying again", "provider", p.name, "nonce", nonce) time.Sleep(time.Duration(p.config.SendTransactionRetryInterval)) - nonce++ + + p.txPool.M.Lock() + p.txPool.Nonce++ + nonce = p.txPool.Nonce + p.txPool.M.Unlock() attempt++ if attempt%10 == 0 { log.Debug("retrying send transaction...", "provider", p.name, "attempt", attempt, "nonce", nonce, "elapsed", time.Since(firstAttemptAt)) diff --git a/op-ufm/op-ufm/pkg/provider/tx_pool.go b/op-ufm/op-ufm/pkg/provider/tx_pool.go index 5b28b34..af0c9b1 100644 --- a/op-ufm/op-ufm/pkg/provider/tx_pool.go +++ b/op-ufm/op-ufm/pkg/provider/tx_pool.go @@ -15,6 +15,7 @@ type NetworkTransactionPool struct { M sync.Mutex Transactions map[string]*TransactionState Expected int + Nonce uint64 } type TransactionState struct { From 5597c22668a58803e4f3699656e164878fcb7f2d Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 31 Aug 2023 13:31:34 -0700 Subject: [PATCH 2/2] format log ctx --- op-ufm/op-ufm/cmd/ufm/main.go | 18 +++++-- op-ufm/op-ufm/pkg/metrics/metrics.go | 46 +++++++++++------ op-ufm/op-ufm/pkg/provider/heartbeat.go | 35 ++++++++++--- op-ufm/op-ufm/pkg/provider/roundtrip.go | 65 +++++++++++++++++++------ op-ufm/op-ufm/pkg/service/service.go | 21 +++++--- 5 files changed, 138 insertions(+), 47 deletions(-) diff --git a/op-ufm/op-ufm/cmd/ufm/main.go b/op-ufm/op-ufm/cmd/ufm/main.go index b603843..ca2cac4 100644 --- a/op-ufm/op-ufm/cmd/ufm/main.go +++ b/op-ufm/op-ufm/cmd/ufm/main.go @@ -28,7 +28,10 @@ func main() { ), ) - log.Info("initializing", "version", GitVersion, "commit", GitCommit, "date", GitDate) + log.Info("initializing", + "version", GitVersion, + "commit", GitCommit, + "date", GitDate) if len(os.Args) < 2 { log.Crit("must specify a config file on the command line") @@ -42,7 +45,8 @@ func main() { sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) recvSig := <-sig - log.Info("caught signal, shutting down", "signal", recvSig) + log.Info("caught signal, shutting down", + "signal", recvSig) svc.Shutdown() } @@ -50,7 +54,9 @@ func main() { func initConfig(cfgFile string) *config.Config { cfg, err := config.New(cfgFile) if err != nil { - log.Crit("error reading config file", "file", cfgFile, "err", err) + log.Crit("error reading config file", + "file", cfgFile, + "err", err) } // update log level from config @@ -58,7 +64,8 @@ func initConfig(cfgFile string) *config.Config { if err != nil { logLevel = log.LvlInfo if cfg.LogLevel != "" { - log.Warn("invalid server.log_level set: " + cfg.LogLevel) + log.Warn("invalid server.log_level", + "log_level", cfg.LogLevel) } } log.Root().SetHandler( @@ -74,7 +81,8 @@ func initConfig(cfgFile string) *config.Config { err = cfg.Validate() if err != nil { - log.Crit("invalid config", "err", err) + log.Crit("invalid config", + "err", err) } return cfg diff --git a/op-ufm/op-ufm/pkg/metrics/metrics.go b/op-ufm/op-ufm/pkg/metrics/metrics.go index 762082c..6c8a64a 100644 --- a/op-ufm/op-ufm/pkg/metrics/metrics.go +++ b/op-ufm/op-ufm/pkg/metrics/metrics.go @@ -84,8 +84,10 @@ var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z ]+`) func RecordError(provider string, errorLabel string) { if Debug { - log.Debug("metric inc", "m", "errors_total", - "provider", provider, "error", errorLabel) + log.Debug("metric inc", + "m", "errors_total", + "provider", provider, + "error", errorLabel) } errorsTotal.WithLabelValues(provider, errorLabel).Inc() } @@ -101,48 +103,64 @@ func RecordErrorDetails(provider string, label string, err error) { func RecordRPCLatency(provider string, client string, method string, latency time.Duration) { if Debug { - log.Debug("metric set", "m", "rpc_latency", - "provider", provider, "client", client, "method", method, "latency", latency) + log.Debug("metric set", + "m", "rpc_latency", + "provider", provider, + "client", client, + "method", method, + "latency", latency) } rpcLatency.WithLabelValues(provider, client, method).Set(float64(latency.Milliseconds())) } func RecordRoundTripLatency(provider string, latency time.Duration) { if Debug { - log.Debug("metric set", "m", "roundtrip_latency", - "provider", provider, "latency", latency) + log.Debug("metric set", + "m", "roundtrip_latency", + "provider", provider, + "latency", latency) } roundTripLatency.WithLabelValues(provider).Set(float64(latency.Milliseconds())) } func RecordGasUsed(provider string, val uint64) { if Debug { - log.Debug("metric add", "m", "gas_used", - "provider", provider, "val", val) + log.Debug("metric add", + "m", "gas_used", + "provider", provider, + "val", val) } gasUsed.WithLabelValues(provider).Set(float64(val)) } func RecordFirstSeenLatency(providerSource string, providerSeen string, latency time.Duration) { if Debug { - log.Debug("metric set", "m", "first_seen_latency", - "provider_source", providerSource, "provider_seen", providerSeen, "latency", latency) + log.Debug("metric set", + "m", "first_seen_latency", + "provider_source", providerSource, + "provider_seen", providerSeen, + "latency", latency) } firstSeenLatency.WithLabelValues(providerSource, providerSeen).Set(float64(latency.Milliseconds())) } func RecordProviderToProviderLatency(providerSource string, providerSeen string, latency time.Duration) { if Debug { - log.Debug("metric set", "m", "provider_to_provider_latency", - "provider_source", providerSource, "provider_seen", providerSeen, "latency", latency) + log.Debug("metric set", + "m", "provider_to_provider_latency", + "provider_source", providerSource, + "provider_seen", providerSeen, + "latency", latency) } providerToProviderLatency.WithLabelValues(providerSource, providerSeen).Set(float64(latency.Milliseconds())) } func RecordTransactionsInFlight(network string, count int) { if Debug { - log.Debug("metric set", "m", "transactions_inflight", - "network", network, "count", count) + log.Debug("metric set", + "m", "transactions_inflight", + "network", network, + "count", count) } networkTransactionsInFlight.WithLabelValues(network).Set(float64(count)) } diff --git a/op-ufm/op-ufm/pkg/provider/heartbeat.go b/op-ufm/op-ufm/pkg/provider/heartbeat.go index 163aaa4..ddc6ed2 100644 --- a/op-ufm/op-ufm/pkg/provider/heartbeat.go +++ b/op-ufm/op-ufm/pkg/provider/heartbeat.go @@ -14,7 +14,9 @@ import ( // Heartbeat polls for expected in-flight transactions func (p *Provider) Heartbeat(ctx context.Context) { - log.Debug("heartbeat", "provider", p.name, "inflight", len(p.txPool.Transactions)) + log.Debug("heartbeat", + "provider", p.name, + "count", len(p.txPool.Transactions)) metrics.RecordTransactionsInFlight(p.config.Network, len(p.txPool.Transactions)) @@ -33,26 +35,42 @@ func (p *Provider) Heartbeat(ctx context.Context) { } if len(expectedTransactions) == 0 { - log.Debug("no expected txs", "count", len(p.txPool.Transactions), "provider", p.name, "alreadySeen", alreadySeen) + log.Debug("no expected txs", + "count", len(p.txPool.Transactions), + "provider", p.name, + "alreadySeen", alreadySeen) return } client, err := clients.Dial(p.name, p.config.URL) if err != nil { - log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err) + log.Error("cant dial to provider", + "provider", p.name, + "url", p.config.URL, + "err", err) } - log.Debug("checking in-flight tx", "count", len(p.txPool.Transactions), "provider", p.name, "alreadySeen", alreadySeen) + log.Debug("checking in-flight tx", + "count", len(p.txPool.Transactions), + "provider", p.name, + "alreadySeen", alreadySeen) for _, st := range expectedTransactions { hash := st.Hash.Hex() _, isPending, err := client.TransactionByHash(ctx, st.Hash) if err != nil && !errors.Is(err, ethereum.NotFound) { - log.Error("cant check transaction", "provider", p.name, "hash", hash, "url", p.config.URL, "err", err) + log.Error("cant check transaction", + "provider", p.name, + "hash", hash, + "url", p.config.URL, + "err", err) continue } - log.Debug("got transaction", "provider", p.name, "hash", hash, "isPending", isPending) + log.Debug("got transaction", + "provider", p.name, + "hash", hash, + "isPending", isPending) // mark transaction as seen by this provider st.M.Lock() @@ -75,7 +93,10 @@ func (p *Provider) Heartbeat(ctx context.Context) { // check if transaction have been seen by all providers p.txPool.M.Lock() if len(st.SeenBy) == p.txPool.Expected { - log.Debug("transaction seen by all", "hash", hash, "expected", p.txPool.Expected, "seenBy", len(st.SeenBy)) + log.Debug("transaction seen by all", + "hash", hash, + "expected", p.txPool.Expected, + "seenBy", len(st.SeenBy)) delete(p.txPool.Transactions, st.Hash.Hex()) } p.txPool.M.Unlock() diff --git a/op-ufm/op-ufm/pkg/provider/roundtrip.go b/op-ufm/op-ufm/pkg/provider/roundtrip.go index 65dc33a..f404f33 100644 --- a/op-ufm/op-ufm/pkg/provider/roundtrip.go +++ b/op-ufm/op-ufm/pkg/provider/roundtrip.go @@ -21,11 +21,15 @@ import ( // RoundTrip send a new transaction to measure round trip latency func (p *Provider) RoundTrip(ctx context.Context) { - log.Debug("roundTripLatency", "provider", p.name) + log.Debug("roundTripLatency", + "provider", p.name) client, err := iclients.Dial(p.name, p.config.URL) if err != nil { - log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err) + log.Error("cant dial to provider", + "provider", p.name, + "url", p.config.URL, + "err", err) return } @@ -34,7 +38,9 @@ func (p *Provider) RoundTrip(ctx context.Context) { if p.txPool.Nonce == uint64(0) { nonce, err = client.PendingNonceAt(ctx, p.walletConfig.Address) if err != nil { - log.Error("cant get nounce", "provider", p.name, "err", err) + log.Error("cant get nounce", + "provider", p.name, + "err", err) p.txPool.M.Unlock() return } @@ -57,7 +63,10 @@ func (p *Provider) RoundTrip(ctx context.Context) { signedTx, err := p.sign(ctx, tx) if err != nil { - log.Error("cant sign tx", "provider", p.name, "tx", tx, "err", err) + log.Error("cant sign tx", + "provider", p.name, + "tx", tx, + "err", err) return } @@ -70,11 +79,18 @@ func (p *Provider) RoundTrip(ctx context.Context) { err.Error() == txpool.ErrReplaceUnderpriced.Error() || err.Error() == core.ErrNonceTooLow.Error() { if time.Since(firstAttemptAt) >= time.Duration(p.config.SendTransactionRetryTimeout) { - log.Error("send transaction timed out (known already)", "provider", p.name, "hash", txHash.Hex(), "elapsed", time.Since(firstAttemptAt), "attempt", attempt, "nonce", nonce) + log.Error("send transaction timed out (known already)", + "provider", p.name, + "hash", txHash.Hex(), + "elapsed", time.Since(firstAttemptAt), + "attempt", attempt, + "nonce", nonce) metrics.RecordError(p.name, "ethclient.SendTransaction.nonce") return } - log.Warn("tx already known, incrementing nonce and trying again", "provider", p.name, "nonce", nonce) + log.Warn("tx already known, incrementing nonce and trying again", + "provider", p.name, + "nonce", nonce) time.Sleep(time.Duration(p.config.SendTransactionRetryInterval)) p.txPool.M.Lock() @@ -83,10 +99,16 @@ func (p *Provider) RoundTrip(ctx context.Context) { p.txPool.M.Unlock() attempt++ if attempt%10 == 0 { - log.Debug("retrying send transaction...", "provider", p.name, "attempt", attempt, "nonce", nonce, "elapsed", time.Since(firstAttemptAt)) + log.Debug("retrying send transaction...", + "provider", p.name, + "attempt", attempt, + "nonce", nonce, + "elapsed", time.Since(firstAttemptAt)) } } else { - log.Error("cant send transaction", "provider", p.name, "err", err) + log.Error("cant send transaction", + "provider", p.name, + "err", err) metrics.RecordErrorDetails(p.name, "ethclient.SendTransaction", err) return } @@ -95,7 +117,10 @@ func (p *Provider) RoundTrip(ctx context.Context) { } } - log.Info("transaction sent", "provider", p.name, "hash", txHash.Hex(), "nonce", nonce) + log.Info("transaction sent", + "provider", p.name, + "hash", txHash.Hex(), + "nonce", nonce) // add to pool sentAt := time.Now() @@ -112,16 +137,25 @@ func (p *Provider) RoundTrip(ctx context.Context) { attempt = 0 for receipt == nil { if time.Since(sentAt) >= time.Duration(p.config.ReceiptRetrievalTimeout) { - log.Error("receipt retrieval timed out", "provider", p.name, "hash", "elapsed", time.Since(sentAt)) + log.Error("receipt retrieval timed out", + "provider", p.name, + "hash", txHash, + "elapsed", time.Since(sentAt)) return } time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval)) if attempt%10 == 0 { - log.Debug("checking for receipt...", "provider", p.name, "attempt", attempt, "elapsed", time.Since(sentAt)) + log.Debug("checking for receipt...", + "provider", p.name, + "attempt", attempt, + "elapsed", time.Since(sentAt)) } receipt, err = client.TransactionReceipt(ctx, txHash) if err != nil && !errors.Is(err, ethereum.NotFound) { - log.Error("cant get receipt for transaction", "provider", p.name, "hash", txHash.Hex(), "err", err) + log.Error("cant get receipt for transaction", + "provider", p.name, + "hash", txHash.Hex(), + "err", err) return } attempt++ @@ -132,7 +166,8 @@ func (p *Provider) RoundTrip(ctx context.Context) { metrics.RecordRoundTripLatency(p.name, roundTripLatency) metrics.RecordGasUsed(p.name, receipt.GasUsed) - log.Info("got transaction receipt", "hash", txHash.Hex(), + log.Info("got transaction receipt", + "hash", txHash.Hex(), "roundTripLatency", roundTripLatency, "provider", p.name, "blockNumber", receipt.BlockNumber, @@ -171,7 +206,9 @@ func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Tran TLSKey: p.signerConfig.TLSKey, } client, err := iclients.NewSignerClient(p.name, log.Root(), p.signerConfig.URL, tlsConfig) - log.Debug("signerclient", "client", client, "err", err) + log.Debug("signerclient", + "client", client, + "err", err) if err != nil { return nil, err } diff --git a/op-ufm/op-ufm/pkg/service/service.go b/op-ufm/op-ufm/pkg/service/service.go index 04b4f6d..6580de8 100644 --- a/op-ufm/op-ufm/pkg/service/service.go +++ b/op-ufm/op-ufm/pkg/service/service.go @@ -32,10 +32,12 @@ func (s *Service) Start(ctx context.Context) { log.Info("service starting") if s.Config.Healthz.Enabled { addr := net.JoinHostPort(s.Config.Healthz.Host, s.Config.Healthz.Port) - log.Info("starting healthz server", "addr", addr) + log.Info("starting healthz server", + "addr", addr) go func() { if err := s.Healthz.Start(ctx, addr); err != nil { - log.Error("error starting healthz server", "err", err) + log.Error("error starting healthz server", + "err", err) } }() } @@ -43,10 +45,12 @@ func (s *Service) Start(ctx context.Context) { metrics.Debug = s.Config.Metrics.Debug if s.Config.Metrics.Enabled { addr := net.JoinHostPort(s.Config.Metrics.Host, s.Config.Metrics.Port) - log.Info("starting metrics server", "addr", addr) + log.Info("starting metrics server", + "addr", addr) go func() { if err := s.Metrics.Start(ctx, addr); err != nil { - log.Error("error starting metrics server", "err", err) + log.Error("error starting metrics server", + "err", err) } }() } @@ -60,7 +64,8 @@ func (s *Service) Start(ctx context.Context) { txpool := &provider.TransactionPool{} for name, providers := range networks { if len(providers) == 1 { - log.Warn("can't measure first seen for network, please another provider", "network", name) + log.Warn("can't measure first seen for network, please another provider", + "network", name) } (*txpool)[name] = &provider.NetworkTransactionPool{} (*txpool)[name].Transactions = make(map[string]*provider.TransactionState) @@ -76,7 +81,8 @@ func (s *Service) Start(ctx context.Context) { s.Config.Wallets[providerConfig.Wallet], (*txpool)[providerConfig.Network]) s.Providers[name].Start(ctx) - log.Info("provider started", "provider", name) + log.Info("provider started", + "provider", name) } log.Info("service started") @@ -94,7 +100,8 @@ func (s *Service) Shutdown() { } for name, provider := range s.Providers { provider.Shutdown() - log.Info("provider stopped", "provider", name) + log.Info("provider stopped", + "provider", name) } log.Info("service stopped") }