diff --git a/op-ufm/op-ufm/Dockerfile b/op-ufm/op-ufm/Dockerfile index ad87f21..db9b84a 100644 --- a/op-ufm/op-ufm/Dockerfile +++ b/op-ufm/op-ufm/Dockerfile @@ -18,9 +18,10 @@ COPY --from=builder /app/entrypoint.sh /bin/entrypoint.sh COPY --from=builder /app/bin/ufm /bin/ufm RUN apk update && \ - apk add ca-certificates && \ chmod +x /bin/entrypoint.sh +RUN apk add ca-certificates jq curl bind-tools + VOLUME /etc/ufm EXPOSE 8080 diff --git a/op-ufm/op-ufm/example.config.toml b/op-ufm/op-ufm/example.config.toml index 13e4cd5..13b62a3 100644 --- a/op-ufm/op-ufm/example.config.toml +++ b/op-ufm/op-ufm/example.config.toml @@ -39,12 +39,6 @@ address = "0x0000000000000000000000000000000000000000" private_key = "0000000000000000000000000000000000000000000000000000000000000000" # Transaction value in wei tx_value = 100000000000000 -# Gas limit -gas_limit = 21000 -# Gas tip cap -gas_tip_cap = 2000000000 -# Fee cap -gas_fee_cap = 20000000000 [providers.p1] # URL to the RPC provider @@ -52,13 +46,15 @@ url = "http://localhost:8551" # Read only providers are only used to check for transactions read_only = true # Interval to poll the provider for expected transactions -read_interval = "1s" +read_interval = "10s" # Interval to submit new transactions to the provider -send_interval = "5s" -# Wallet to be used for sending transactions -wallet = "default" -# Network to pool transactions, i.e. providers in the same network will check transactions from each other -network = "op-goerli" +send_interval = "30s" +# Interval between send transaction when we get "already known" txpool err +send_transaction_retry_interval = "100ms" +# Max time to retry +send_transaction_retry_timeout = "5s" +# Interval between each send transaction to the same network +send_transaction_cool_down = "30s" # Interval between receipt retrieval receipt_retrieval_interval = "500ms" # Max time to check for receipt @@ -72,13 +68,15 @@ url = "http://localhost:8552" # Read only providers are only used to check for transactions read_only = false # Interval to poll the provider for expected transactions -read_interval = "2s" +read_interval = "10s" # Interval to submit new transactions to the provider -send_interval = "3s" -# Wallet to be used for sending transactions -wallet = "default" -# Network to pool transactions, i.e. providers in the same network will check transactions from each other -network = "op-goerli" +send_interval = "30s" +# Interval between send transaction when we get "already known" txpool err +send_transaction_retry_interval = "100ms" +# Max time to retry +send_transaction_retry_timeout = "5s" +# Interval between each send transaction to the same network +send_transaction_cool_down = "30s" # Interval between receipt retrieval receipt_retrieval_interval = "500ms" # Max time to check for receipt diff --git a/op-ufm/op-ufm/pkg/config/config.go b/op-ufm/op-ufm/pkg/config/config.go index 39c96ce..e0c80d6 100644 --- a/op-ufm/op-ufm/pkg/config/config.go +++ b/op-ufm/op-ufm/pkg/config/config.go @@ -48,10 +48,7 @@ type WalletConfig struct { PrivateKey string `toml:"private_key"` // transaction parameters - TxValue big.Int `toml:"tx_value"` - GasLimit uint64 `toml:"gas_limit"` - GasTipCap big.Int `toml:"gas_tip_cap"` - GasFeeCap big.Int `toml:"gas_fee_cap"` + TxValue big.Int `toml:"tx_value"` } type ProviderConfig struct { @@ -64,6 +61,7 @@ type ProviderConfig struct { SendInterval TOMLDuration `toml:"send_interval"` SendTransactionRetryInterval TOMLDuration `toml:"send_transaction_retry_interval"` SendTransactionRetryTimeout TOMLDuration `toml:"send_transaction_retry_timeout"` + SendTransactionCoolDown TOMLDuration `toml:"send_transaction_cool_down"` ReceiptRetrievalInterval TOMLDuration `toml:"receipt_retrieval_interval"` ReceiptRetrievalTimeout TOMLDuration `toml:"receipt_retrieval_timeout"` @@ -130,12 +128,6 @@ func (c *Config) Validate() error { if wallet.TxValue.BitLen() == 0 { return errors.Errorf("wallet [%s] tx_value is missing", name) } - if wallet.GasLimit == 0 { - return errors.Errorf("wallet [%s] gas_limit is missing", name) - } - if wallet.GasFeeCap.BitLen() == 0 { - return errors.Errorf("wallet [%s] gas_fee_cap is missing", name) - } } for name, provider := range c.Providers { @@ -154,6 +146,9 @@ func (c *Config) Validate() error { if provider.SendTransactionRetryTimeout == 0 { return errors.Errorf("provider [%s] send_transaction_retry_timeout is missing", name) } + if provider.SendTransactionCoolDown == 0 { + return errors.Errorf("provider [%s] send_transaction_cool_down is missing", name) + } if provider.ReceiptRetrievalInterval == 0 { return errors.Errorf("provider [%s] receipt_retrieval_interval is missing", name) } diff --git a/op-ufm/op-ufm/pkg/metrics/clients/eth.go b/op-ufm/op-ufm/pkg/metrics/clients/eth.go index 6986762..92d7c25 100644 --- a/op-ufm/op-ufm/pkg/metrics/clients/eth.go +++ b/op-ufm/op-ufm/pkg/metrics/clients/eth.go @@ -2,6 +2,7 @@ package clients import ( "context" + "math/big" "time" "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics" @@ -22,7 +23,7 @@ func Dial(providerName string, url string) (*InstrumentedEthClient, error) { start := time.Now() c, err := ethclient.Dial(url) if err != nil { - metrics.RecordError(providerName, "ethclient.Dial") + metrics.RecordErrorDetails(providerName, "ethclient.Dial", err) return nil, err } metrics.RecordRPCLatency(providerName, "ethclient", "Dial", time.Since(start)) @@ -34,7 +35,7 @@ func (i *InstrumentedEthClient) TransactionByHash(ctx context.Context, hash comm tx, isPending, err := i.c.TransactionByHash(ctx, hash) if err != nil { if !i.ignorableErrors(err) { - metrics.RecordError(i.providerName, "ethclient.TransactionByHash") + metrics.RecordErrorDetails(i.providerName, "ethclient.TransactionByHash", err) } return nil, false, err } @@ -46,7 +47,7 @@ func (i *InstrumentedEthClient) PendingNonceAt(ctx context.Context, address stri start := time.Now() nonce, err := i.c.PendingNonceAt(ctx, common.HexToAddress(address)) if err != nil { - metrics.RecordError(i.providerName, "ethclient.PendingNonceAt") + metrics.RecordErrorDetails(i.providerName, "ethclient.PendingNonceAt", err) return 0, err } metrics.RecordRPCLatency(i.providerName, "ethclient", "PendingNonceAt", time.Since(start)) @@ -58,7 +59,7 @@ func (i *InstrumentedEthClient) TransactionReceipt(ctx context.Context, txHash c receipt, err := i.c.TransactionReceipt(ctx, txHash) if err != nil { if !i.ignorableErrors(err) { - metrics.RecordError(i.providerName, "ethclient.TransactionReceipt") + metrics.RecordErrorDetails(i.providerName, "ethclient.TransactionReceipt", err) } return nil, err } @@ -71,7 +72,7 @@ func (i *InstrumentedEthClient) SendTransaction(ctx context.Context, tx *types.T err := i.c.SendTransaction(ctx, tx) if err != nil { if !i.ignorableErrors(err) { - metrics.RecordError(i.providerName, "ethclient.SendTransaction") + metrics.RecordErrorDetails(i.providerName, "ethclient.SendTransaction", err) } return err } @@ -79,6 +80,39 @@ func (i *InstrumentedEthClient) SendTransaction(ctx context.Context, tx *types.T return err } +func (i *InstrumentedEthClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { + start := time.Now() + gas, err := i.c.EstimateGas(ctx, msg) + if err != nil { + metrics.RecordErrorDetails(i.providerName, "ethclient.EstimateGas", err) + return 0, err + } + metrics.RecordRPCLatency(i.providerName, "ethclient", "EstimateGas", time.Since(start)) + return gas, err +} + +func (i *InstrumentedEthClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { + start := time.Now() + gasTipCap, err := i.c.SuggestGasTipCap(ctx) + if err != nil { + metrics.RecordErrorDetails(i.providerName, "ethclient.SuggestGasTipCap", err) + return nil, err + } + metrics.RecordRPCLatency(i.providerName, "ethclient", "SuggestGasTipCap", time.Since(start)) + return gasTipCap, err +} + +func (i *InstrumentedEthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + start := time.Now() + header, err := i.c.HeaderByNumber(ctx, number) + if err != nil { + metrics.RecordErrorDetails(i.providerName, "ethclient.HeaderByNumber", err) + return nil, err + } + metrics.RecordRPCLatency(i.providerName, "ethclient", "HeaderByNumber", time.Since(start)) + return header, err +} + func (i *InstrumentedEthClient) ignorableErrors(err error) bool { msg := err.Error() // we dont use errors.Is because eth client actually uses errors.New, diff --git a/op-ufm/op-ufm/pkg/metrics/clients/signer.go b/op-ufm/op-ufm/pkg/metrics/clients/signer.go index 17a6cd2..54f1163 100644 --- a/op-ufm/op-ufm/pkg/metrics/clients/signer.go +++ b/op-ufm/op-ufm/pkg/metrics/clients/signer.go @@ -22,7 +22,7 @@ func NewSignerClient(providerName string, logger log.Logger, endpoint string, tl start := time.Now() c, err := signer.NewSignerClient(logger, endpoint, tlsConfig) if err != nil { - metrics.RecordError(providerName, "signer.NewSignerClient") + metrics.RecordErrorDetails(providerName, "signer.NewSignerClient", err) return nil, err } metrics.RecordRPCLatency(providerName, "signer", "NewSignerClient", time.Since(start)) @@ -33,7 +33,7 @@ func (i *InstrumentedSignerClient) SignTransaction(ctx context.Context, chainId start := time.Now() tx, err := i.c.SignTransaction(ctx, chainId, tx) if err != nil { - metrics.RecordError(i.providerName, "signer.SignTransaction") + metrics.RecordErrorDetails(i.providerName, "signer.SignTransaction", err) return nil, err } metrics.RecordRPCLatency(i.providerName, "signer", "SignTransaction", time.Since(start)) diff --git a/op-ufm/op-ufm/pkg/provider/roundtrip.go b/op-ufm/op-ufm/pkg/provider/roundtrip.go index f404f33..3145abc 100644 --- a/op-ufm/op-ufm/pkg/provider/roundtrip.go +++ b/op-ufm/op-ufm/pkg/provider/roundtrip.go @@ -2,6 +2,7 @@ package provider import ( "context" + "math/big" "time" "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics" @@ -21,7 +22,7 @@ import ( // RoundTrip send a new transaction to measure round trip latency func (p *Provider) RoundTrip(ctx context.Context) { - log.Debug("roundTripLatency", + log.Debug("RoundTrip", "provider", p.name) client, err := iclients.Dial(p.name, p.config.URL) @@ -33,33 +34,38 @@ func (p *Provider) RoundTrip(ctx context.Context) { return } - var nonce uint64 - p.txPool.M.Lock() - if p.txPool.Nonce == uint64(0) { - nonce, err = client.PendingNonceAt(ctx, p.walletConfig.Address) - if err != nil { - log.Error("cant get nounce", - "provider", p.name, - "err", err) - p.txPool.M.Unlock() - return - } - p.txPool.Nonce = nonce - } else { - p.txPool.Nonce++ - nonce = p.txPool.Nonce - } - p.txPool.M.Unlock() + p.txPool.ExclusiveSend.Lock() + defer p.txPool.ExclusiveSend.Unlock() txHash := common.Hash{} attempt := 0 + nonce := uint64(0) + // used for timeout firstAttemptAt := time.Now() // used for actual round trip time (disregard retry time) - roundTripStartedAt := time.Now() + var roundTripStartedAt time.Time for { - tx := p.createTx(nonce) - txHash = tx.Hash() + + // sleep until we get a clear to send + for { + coolDown := time.Duration(p.config.SendTransactionCoolDown) - time.Since(p.txPool.LastSend) + if coolDown > 0 { + time.Sleep(coolDown) + } else { + break + } + } + + tx, err := p.createTx(ctx, client, nonce) + nonce = tx.Nonce() + if err != nil { + log.Error("cant create tx", + "provider", p.name, + "nonce", nonce, + "err", err) + return + } signedTx, err := p.sign(ctx, tx) if err != nil { @@ -69,7 +75,6 @@ func (p *Provider) RoundTrip(ctx context.Context) { "err", err) return } - txHash = signedTx.Hash() roundTripStartedAt = time.Now() @@ -78,25 +83,29 @@ func (p *Provider) RoundTrip(ctx context.Context) { if err.Error() == txpool.ErrAlreadyKnown.Error() || err.Error() == txpool.ErrReplaceUnderpriced.Error() || err.Error() == core.ErrNonceTooLow.Error() { + + log.Warn("cant send transaction (retryable)", + "provider", p.name, + "err", err, + "nonce", nonce) + if time.Since(firstAttemptAt) >= time.Duration(p.config.SendTransactionRetryTimeout) { log.Error("send transaction timed out (known already)", "provider", p.name, "hash", txHash.Hex(), + "nonce", nonce, "elapsed", time.Since(firstAttemptAt), - "attempt", attempt, - "nonce", nonce) - metrics.RecordError(p.name, "ethclient.SendTransaction.nonce") + "attempt", attempt) + metrics.RecordErrorDetails(p.name, "send.timeout", err) return } + log.Warn("tx already known, incrementing nonce and trying again", "provider", p.name, "nonce", nonce) time.Sleep(time.Duration(p.config.SendTransactionRetryInterval)) - p.txPool.M.Lock() - p.txPool.Nonce++ - nonce = p.txPool.Nonce - p.txPool.M.Unlock() + nonce++ attempt++ if attempt%10 == 0 { log.Debug("retrying send transaction...", @@ -108,6 +117,7 @@ func (p *Provider) RoundTrip(ctx context.Context) { } else { log.Error("cant send transaction", "provider", p.name, + "nonce", nonce, "err", err) metrics.RecordErrorDetails(p.name, "ethclient.SendTransaction", err) return @@ -131,6 +141,7 @@ func (p *Provider) RoundTrip(ctx context.Context) { SentAt: sentAt, SeenBy: make(map[string]time.Time), } + p.txPool.LastSend = sentAt p.txPool.M.Unlock() var receipt *types.Receipt @@ -140,13 +151,17 @@ func (p *Provider) RoundTrip(ctx context.Context) { log.Error("receipt retrieval timed out", "provider", p.name, "hash", txHash, + "nonce", nonce, "elapsed", time.Since(sentAt)) + metrics.RecordErrorDetails(p.name, "receipt.timeout", err) return } time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval)) if attempt%10 == 0 { log.Debug("checking for receipt...", "provider", p.name, + "hash", txHash, + "nonce", nonce, "attempt", attempt, "elapsed", time.Since(sentAt)) } @@ -155,6 +170,7 @@ func (p *Provider) RoundTrip(ctx context.Context) { log.Error("cant get receipt for transaction", "provider", p.name, "hash", txHash.Hex(), + "nonce", nonce, "err", err) return } @@ -168,6 +184,7 @@ func (p *Provider) RoundTrip(ctx context.Context) { log.Info("got transaction receipt", "hash", txHash.Hex(), + "nonce", nonce, "roundTripLatency", roundTripLatency, "provider", p.name, "blockNumber", receipt.BlockNumber, @@ -175,20 +192,83 @@ func (p *Provider) RoundTrip(ctx context.Context) { "gasUsed", receipt.GasUsed) } -func (p *Provider) createTx(nonce uint64) *types.Transaction { - toAddress := common.HexToAddress(p.walletConfig.Address) +func (p *Provider) createTx(ctx context.Context, client *iclients.InstrumentedEthClient, nonce uint64) (*types.Transaction, error) { + var err error + if nonce == 0 { + nonce, err = client.PendingNonceAt(ctx, p.walletConfig.Address) + if err != nil { + log.Error("cant get nounce", + "provider", p.name, + "nonce", nonce, + "err", err) + return nil, err + } + } + + gasTipCap, err := client.SuggestGasTipCap(ctx) + if err != nil { + log.Error("cant get gas tip cap", + "provider", p.name, + "err", err) + return nil, err + } + gasTipCap = new(big.Int).Mul(gasTipCap, big.NewInt(110)) + gasTipCap = new(big.Int).Div(gasTipCap, big.NewInt(100)) + + head, err := client.HeaderByNumber(ctx, nil) + if err != nil { + log.Error("cant get base fee from head", + "provider", p.name, + "err", err) + return nil, err + } + baseFee := head.BaseFee + + gasFeeCap := new(big.Int).Add( + gasTipCap, + new(big.Int).Mul(baseFee, big.NewInt(2))) + + addr := common.HexToAddress(p.walletConfig.Address) var data []byte - tx := types.NewTx(&types.DynamicFeeTx{ + dynamicTx := &types.DynamicFeeTx{ ChainID: &p.walletConfig.ChainID, Nonce: nonce, - GasFeeCap: &p.walletConfig.GasFeeCap, - GasTipCap: &p.walletConfig.GasTipCap, - Gas: p.walletConfig.GasLimit, - To: &toAddress, + GasFeeCap: gasFeeCap, + GasTipCap: gasTipCap, + To: &addr, Value: &p.walletConfig.TxValue, Data: data, + } + + gas, err := client.EstimateGas(ctx, ethereum.CallMsg{ + From: addr, + To: &addr, + GasFeeCap: gasFeeCap, + GasTipCap: gasTipCap, + Data: dynamicTx.Data, + Value: dynamicTx.Value, }) - return tx + if err != nil { + log.Error("cant estimate gas", + "provider", p.name, + "err", err) + return nil, err + } + dynamicTx.Gas = gas + tx := types.NewTx(dynamicTx) + + log.Info("tx created", + "provider", p.name, + "from", addr, + "to", dynamicTx.To, + "nonce", dynamicTx.Nonce, + "value", dynamicTx.Value, + "gas", dynamicTx.Gas, + "gasTipCap", dynamicTx.GasTipCap, + "gasFeeCap", dynamicTx.GasFeeCap, + ) + + return tx, nil } func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) { diff --git a/op-ufm/op-ufm/pkg/provider/tx_pool.go b/op-ufm/op-ufm/pkg/provider/tx_pool.go index af0c9b1..2ea5f6a 100644 --- a/op-ufm/op-ufm/pkg/provider/tx_pool.go +++ b/op-ufm/op-ufm/pkg/provider/tx_pool.go @@ -15,7 +15,11 @@ type NetworkTransactionPool struct { M sync.Mutex Transactions map[string]*TransactionState Expected int - Nonce uint64 + + // Last time a transaction was sent + LastSend time.Time + // Prevents concurrent transaction send + ExclusiveSend sync.Mutex } type TransactionState struct {