Merge pull request #7132 from ethereum-optimism/felipe/cooldown-b

feat(ufm): add cooldown, always use network gas and nonce, better error metrics
This commit is contained in:
OptimismBot 2023-09-08 12:56:32 -04:00 committed by GitHub
commit a0c866be83
7 changed files with 186 additions and 74 deletions

@ -18,9 +18,10 @@ COPY --from=builder /app/entrypoint.sh /bin/entrypoint.sh
COPY --from=builder /app/bin/ufm /bin/ufm COPY --from=builder /app/bin/ufm /bin/ufm
RUN apk update && \ RUN apk update && \
apk add ca-certificates && \
chmod +x /bin/entrypoint.sh chmod +x /bin/entrypoint.sh
RUN apk add ca-certificates jq curl bind-tools
VOLUME /etc/ufm VOLUME /etc/ufm
EXPOSE 8080 EXPOSE 8080

@ -39,12 +39,6 @@ address = "0x0000000000000000000000000000000000000000"
private_key = "0000000000000000000000000000000000000000000000000000000000000000" private_key = "0000000000000000000000000000000000000000000000000000000000000000"
# Transaction value in wei # Transaction value in wei
tx_value = 100000000000000 tx_value = 100000000000000
# Gas limit
gas_limit = 21000
# Gas tip cap
gas_tip_cap = 2000000000
# Fee cap
gas_fee_cap = 20000000000
[providers.p1] [providers.p1]
# URL to the RPC provider # URL to the RPC provider
@ -52,13 +46,15 @@ url = "http://localhost:8551"
# Read only providers are only used to check for transactions # Read only providers are only used to check for transactions
read_only = true read_only = true
# Interval to poll the provider for expected transactions # Interval to poll the provider for expected transactions
read_interval = "1s" read_interval = "10s"
# Interval to submit new transactions to the provider # Interval to submit new transactions to the provider
send_interval = "5s" send_interval = "30s"
# Wallet to be used for sending transactions # Interval between send transaction when we get "already known" txpool err
wallet = "default" send_transaction_retry_interval = "100ms"
# Network to pool transactions, i.e. providers in the same network will check transactions from each other # Max time to retry
network = "op-goerli" send_transaction_retry_timeout = "5s"
# Interval between each send transaction to the same network
send_transaction_cool_down = "30s"
# Interval between receipt retrieval # Interval between receipt retrieval
receipt_retrieval_interval = "500ms" receipt_retrieval_interval = "500ms"
# Max time to check for receipt # Max time to check for receipt
@ -72,13 +68,15 @@ url = "http://localhost:8552"
# Read only providers are only used to check for transactions # Read only providers are only used to check for transactions
read_only = false read_only = false
# Interval to poll the provider for expected transactions # Interval to poll the provider for expected transactions
read_interval = "2s" read_interval = "10s"
# Interval to submit new transactions to the provider # Interval to submit new transactions to the provider
send_interval = "3s" send_interval = "30s"
# Wallet to be used for sending transactions # Interval between send transaction when we get "already known" txpool err
wallet = "default" send_transaction_retry_interval = "100ms"
# Network to pool transactions, i.e. providers in the same network will check transactions from each other # Max time to retry
network = "op-goerli" send_transaction_retry_timeout = "5s"
# Interval between each send transaction to the same network
send_transaction_cool_down = "30s"
# Interval between receipt retrieval # Interval between receipt retrieval
receipt_retrieval_interval = "500ms" receipt_retrieval_interval = "500ms"
# Max time to check for receipt # Max time to check for receipt

@ -48,10 +48,7 @@ type WalletConfig struct {
PrivateKey string `toml:"private_key"` PrivateKey string `toml:"private_key"`
// transaction parameters // transaction parameters
TxValue big.Int `toml:"tx_value"` TxValue big.Int `toml:"tx_value"`
GasLimit uint64 `toml:"gas_limit"`
GasTipCap big.Int `toml:"gas_tip_cap"`
GasFeeCap big.Int `toml:"gas_fee_cap"`
} }
type ProviderConfig struct { type ProviderConfig struct {
@ -64,6 +61,7 @@ type ProviderConfig struct {
SendInterval TOMLDuration `toml:"send_interval"` SendInterval TOMLDuration `toml:"send_interval"`
SendTransactionRetryInterval TOMLDuration `toml:"send_transaction_retry_interval"` SendTransactionRetryInterval TOMLDuration `toml:"send_transaction_retry_interval"`
SendTransactionRetryTimeout TOMLDuration `toml:"send_transaction_retry_timeout"` SendTransactionRetryTimeout TOMLDuration `toml:"send_transaction_retry_timeout"`
SendTransactionCoolDown TOMLDuration `toml:"send_transaction_cool_down"`
ReceiptRetrievalInterval TOMLDuration `toml:"receipt_retrieval_interval"` ReceiptRetrievalInterval TOMLDuration `toml:"receipt_retrieval_interval"`
ReceiptRetrievalTimeout TOMLDuration `toml:"receipt_retrieval_timeout"` ReceiptRetrievalTimeout TOMLDuration `toml:"receipt_retrieval_timeout"`
@ -130,12 +128,6 @@ func (c *Config) Validate() error {
if wallet.TxValue.BitLen() == 0 { if wallet.TxValue.BitLen() == 0 {
return errors.Errorf("wallet [%s] tx_value is missing", name) return errors.Errorf("wallet [%s] tx_value is missing", name)
} }
if wallet.GasLimit == 0 {
return errors.Errorf("wallet [%s] gas_limit is missing", name)
}
if wallet.GasFeeCap.BitLen() == 0 {
return errors.Errorf("wallet [%s] gas_fee_cap is missing", name)
}
} }
for name, provider := range c.Providers { for name, provider := range c.Providers {
@ -154,6 +146,9 @@ func (c *Config) Validate() error {
if provider.SendTransactionRetryTimeout == 0 { if provider.SendTransactionRetryTimeout == 0 {
return errors.Errorf("provider [%s] send_transaction_retry_timeout is missing", name) return errors.Errorf("provider [%s] send_transaction_retry_timeout is missing", name)
} }
if provider.SendTransactionCoolDown == 0 {
return errors.Errorf("provider [%s] send_transaction_cool_down is missing", name)
}
if provider.ReceiptRetrievalInterval == 0 { if provider.ReceiptRetrievalInterval == 0 {
return errors.Errorf("provider [%s] receipt_retrieval_interval is missing", name) return errors.Errorf("provider [%s] receipt_retrieval_interval is missing", name)
} }

@ -2,6 +2,7 @@ package clients
import ( import (
"context" "context"
"math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics" "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics"
@ -22,7 +23,7 @@ func Dial(providerName string, url string) (*InstrumentedEthClient, error) {
start := time.Now() start := time.Now()
c, err := ethclient.Dial(url) c, err := ethclient.Dial(url)
if err != nil { if err != nil {
metrics.RecordError(providerName, "ethclient.Dial") metrics.RecordErrorDetails(providerName, "ethclient.Dial", err)
return nil, err return nil, err
} }
metrics.RecordRPCLatency(providerName, "ethclient", "Dial", time.Since(start)) metrics.RecordRPCLatency(providerName, "ethclient", "Dial", time.Since(start))
@ -34,7 +35,7 @@ func (i *InstrumentedEthClient) TransactionByHash(ctx context.Context, hash comm
tx, isPending, err := i.c.TransactionByHash(ctx, hash) tx, isPending, err := i.c.TransactionByHash(ctx, hash)
if err != nil { if err != nil {
if !i.ignorableErrors(err) { if !i.ignorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.TransactionByHash") metrics.RecordErrorDetails(i.providerName, "ethclient.TransactionByHash", err)
} }
return nil, false, err return nil, false, err
} }
@ -46,7 +47,7 @@ func (i *InstrumentedEthClient) PendingNonceAt(ctx context.Context, address stri
start := time.Now() start := time.Now()
nonce, err := i.c.PendingNonceAt(ctx, common.HexToAddress(address)) nonce, err := i.c.PendingNonceAt(ctx, common.HexToAddress(address))
if err != nil { if err != nil {
metrics.RecordError(i.providerName, "ethclient.PendingNonceAt") metrics.RecordErrorDetails(i.providerName, "ethclient.PendingNonceAt", err)
return 0, err return 0, err
} }
metrics.RecordRPCLatency(i.providerName, "ethclient", "PendingNonceAt", time.Since(start)) metrics.RecordRPCLatency(i.providerName, "ethclient", "PendingNonceAt", time.Since(start))
@ -58,7 +59,7 @@ func (i *InstrumentedEthClient) TransactionReceipt(ctx context.Context, txHash c
receipt, err := i.c.TransactionReceipt(ctx, txHash) receipt, err := i.c.TransactionReceipt(ctx, txHash)
if err != nil { if err != nil {
if !i.ignorableErrors(err) { if !i.ignorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.TransactionReceipt") metrics.RecordErrorDetails(i.providerName, "ethclient.TransactionReceipt", err)
} }
return nil, err return nil, err
} }
@ -71,7 +72,7 @@ func (i *InstrumentedEthClient) SendTransaction(ctx context.Context, tx *types.T
err := i.c.SendTransaction(ctx, tx) err := i.c.SendTransaction(ctx, tx)
if err != nil { if err != nil {
if !i.ignorableErrors(err) { if !i.ignorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.SendTransaction") metrics.RecordErrorDetails(i.providerName, "ethclient.SendTransaction", err)
} }
return err return err
} }
@ -79,6 +80,39 @@ func (i *InstrumentedEthClient) SendTransaction(ctx context.Context, tx *types.T
return err return err
} }
func (i *InstrumentedEthClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
start := time.Now()
gas, err := i.c.EstimateGas(ctx, msg)
if err != nil {
metrics.RecordErrorDetails(i.providerName, "ethclient.EstimateGas", err)
return 0, err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "EstimateGas", time.Since(start))
return gas, err
}
func (i *InstrumentedEthClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
start := time.Now()
gasTipCap, err := i.c.SuggestGasTipCap(ctx)
if err != nil {
metrics.RecordErrorDetails(i.providerName, "ethclient.SuggestGasTipCap", err)
return nil, err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "SuggestGasTipCap", time.Since(start))
return gasTipCap, err
}
func (i *InstrumentedEthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
start := time.Now()
header, err := i.c.HeaderByNumber(ctx, number)
if err != nil {
metrics.RecordErrorDetails(i.providerName, "ethclient.HeaderByNumber", err)
return nil, err
}
metrics.RecordRPCLatency(i.providerName, "ethclient", "HeaderByNumber", time.Since(start))
return header, err
}
func (i *InstrumentedEthClient) ignorableErrors(err error) bool { func (i *InstrumentedEthClient) ignorableErrors(err error) bool {
msg := err.Error() msg := err.Error()
// we dont use errors.Is because eth client actually uses errors.New, // we dont use errors.Is because eth client actually uses errors.New,

@ -22,7 +22,7 @@ func NewSignerClient(providerName string, logger log.Logger, endpoint string, tl
start := time.Now() start := time.Now()
c, err := signer.NewSignerClient(logger, endpoint, tlsConfig) c, err := signer.NewSignerClient(logger, endpoint, tlsConfig)
if err != nil { if err != nil {
metrics.RecordError(providerName, "signer.NewSignerClient") metrics.RecordErrorDetails(providerName, "signer.NewSignerClient", err)
return nil, err return nil, err
} }
metrics.RecordRPCLatency(providerName, "signer", "NewSignerClient", time.Since(start)) metrics.RecordRPCLatency(providerName, "signer", "NewSignerClient", time.Since(start))
@ -33,7 +33,7 @@ func (i *InstrumentedSignerClient) SignTransaction(ctx context.Context, chainId
start := time.Now() start := time.Now()
tx, err := i.c.SignTransaction(ctx, chainId, tx) tx, err := i.c.SignTransaction(ctx, chainId, tx)
if err != nil { if err != nil {
metrics.RecordError(i.providerName, "signer.SignTransaction") metrics.RecordErrorDetails(i.providerName, "signer.SignTransaction", err)
return nil, err return nil, err
} }
metrics.RecordRPCLatency(i.providerName, "signer", "SignTransaction", time.Since(start)) metrics.RecordRPCLatency(i.providerName, "signer", "SignTransaction", time.Since(start))

@ -2,6 +2,7 @@ package provider
import ( import (
"context" "context"
"math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics" "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics"
@ -21,7 +22,7 @@ import (
// RoundTrip send a new transaction to measure round trip latency // RoundTrip send a new transaction to measure round trip latency
func (p *Provider) RoundTrip(ctx context.Context) { func (p *Provider) RoundTrip(ctx context.Context) {
log.Debug("roundTripLatency", log.Debug("RoundTrip",
"provider", p.name) "provider", p.name)
client, err := iclients.Dial(p.name, p.config.URL) client, err := iclients.Dial(p.name, p.config.URL)
@ -33,33 +34,38 @@ func (p *Provider) RoundTrip(ctx context.Context) {
return return
} }
var nonce uint64 p.txPool.ExclusiveSend.Lock()
p.txPool.M.Lock() defer p.txPool.ExclusiveSend.Unlock()
if p.txPool.Nonce == uint64(0) {
nonce, err = client.PendingNonceAt(ctx, p.walletConfig.Address)
if err != nil {
log.Error("cant get nounce",
"provider", p.name,
"err", err)
p.txPool.M.Unlock()
return
}
p.txPool.Nonce = nonce
} else {
p.txPool.Nonce++
nonce = p.txPool.Nonce
}
p.txPool.M.Unlock()
txHash := common.Hash{} txHash := common.Hash{}
attempt := 0 attempt := 0
nonce := uint64(0)
// used for timeout // used for timeout
firstAttemptAt := time.Now() firstAttemptAt := time.Now()
// used for actual round trip time (disregard retry time) // used for actual round trip time (disregard retry time)
roundTripStartedAt := time.Now() var roundTripStartedAt time.Time
for { for {
tx := p.createTx(nonce)
txHash = tx.Hash() // sleep until we get a clear to send
for {
coolDown := time.Duration(p.config.SendTransactionCoolDown) - time.Since(p.txPool.LastSend)
if coolDown > 0 {
time.Sleep(coolDown)
} else {
break
}
}
tx, err := p.createTx(ctx, client, nonce)
nonce = tx.Nonce()
if err != nil {
log.Error("cant create tx",
"provider", p.name,
"nonce", nonce,
"err", err)
return
}
signedTx, err := p.sign(ctx, tx) signedTx, err := p.sign(ctx, tx)
if err != nil { if err != nil {
@ -69,7 +75,6 @@ func (p *Provider) RoundTrip(ctx context.Context) {
"err", err) "err", err)
return return
} }
txHash = signedTx.Hash() txHash = signedTx.Hash()
roundTripStartedAt = time.Now() roundTripStartedAt = time.Now()
@ -78,25 +83,29 @@ func (p *Provider) RoundTrip(ctx context.Context) {
if err.Error() == txpool.ErrAlreadyKnown.Error() || if err.Error() == txpool.ErrAlreadyKnown.Error() ||
err.Error() == txpool.ErrReplaceUnderpriced.Error() || err.Error() == txpool.ErrReplaceUnderpriced.Error() ||
err.Error() == core.ErrNonceTooLow.Error() { err.Error() == core.ErrNonceTooLow.Error() {
log.Warn("cant send transaction (retryable)",
"provider", p.name,
"err", err,
"nonce", nonce)
if time.Since(firstAttemptAt) >= time.Duration(p.config.SendTransactionRetryTimeout) { if time.Since(firstAttemptAt) >= time.Duration(p.config.SendTransactionRetryTimeout) {
log.Error("send transaction timed out (known already)", log.Error("send transaction timed out (known already)",
"provider", p.name, "provider", p.name,
"hash", txHash.Hex(), "hash", txHash.Hex(),
"nonce", nonce,
"elapsed", time.Since(firstAttemptAt), "elapsed", time.Since(firstAttemptAt),
"attempt", attempt, "attempt", attempt)
"nonce", nonce) metrics.RecordErrorDetails(p.name, "send.timeout", err)
metrics.RecordError(p.name, "ethclient.SendTransaction.nonce")
return return
} }
log.Warn("tx already known, incrementing nonce and trying again", log.Warn("tx already known, incrementing nonce and trying again",
"provider", p.name, "provider", p.name,
"nonce", nonce) "nonce", nonce)
time.Sleep(time.Duration(p.config.SendTransactionRetryInterval)) time.Sleep(time.Duration(p.config.SendTransactionRetryInterval))
p.txPool.M.Lock() nonce++
p.txPool.Nonce++
nonce = p.txPool.Nonce
p.txPool.M.Unlock()
attempt++ attempt++
if attempt%10 == 0 { if attempt%10 == 0 {
log.Debug("retrying send transaction...", log.Debug("retrying send transaction...",
@ -108,6 +117,7 @@ func (p *Provider) RoundTrip(ctx context.Context) {
} else { } else {
log.Error("cant send transaction", log.Error("cant send transaction",
"provider", p.name, "provider", p.name,
"nonce", nonce,
"err", err) "err", err)
metrics.RecordErrorDetails(p.name, "ethclient.SendTransaction", err) metrics.RecordErrorDetails(p.name, "ethclient.SendTransaction", err)
return return
@ -131,6 +141,7 @@ func (p *Provider) RoundTrip(ctx context.Context) {
SentAt: sentAt, SentAt: sentAt,
SeenBy: make(map[string]time.Time), SeenBy: make(map[string]time.Time),
} }
p.txPool.LastSend = sentAt
p.txPool.M.Unlock() p.txPool.M.Unlock()
var receipt *types.Receipt var receipt *types.Receipt
@ -140,13 +151,17 @@ func (p *Provider) RoundTrip(ctx context.Context) {
log.Error("receipt retrieval timed out", log.Error("receipt retrieval timed out",
"provider", p.name, "provider", p.name,
"hash", txHash, "hash", txHash,
"nonce", nonce,
"elapsed", time.Since(sentAt)) "elapsed", time.Since(sentAt))
metrics.RecordErrorDetails(p.name, "receipt.timeout", err)
return return
} }
time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval)) time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval))
if attempt%10 == 0 { if attempt%10 == 0 {
log.Debug("checking for receipt...", log.Debug("checking for receipt...",
"provider", p.name, "provider", p.name,
"hash", txHash,
"nonce", nonce,
"attempt", attempt, "attempt", attempt,
"elapsed", time.Since(sentAt)) "elapsed", time.Since(sentAt))
} }
@ -155,6 +170,7 @@ func (p *Provider) RoundTrip(ctx context.Context) {
log.Error("cant get receipt for transaction", log.Error("cant get receipt for transaction",
"provider", p.name, "provider", p.name,
"hash", txHash.Hex(), "hash", txHash.Hex(),
"nonce", nonce,
"err", err) "err", err)
return return
} }
@ -168,6 +184,7 @@ func (p *Provider) RoundTrip(ctx context.Context) {
log.Info("got transaction receipt", log.Info("got transaction receipt",
"hash", txHash.Hex(), "hash", txHash.Hex(),
"nonce", nonce,
"roundTripLatency", roundTripLatency, "roundTripLatency", roundTripLatency,
"provider", p.name, "provider", p.name,
"blockNumber", receipt.BlockNumber, "blockNumber", receipt.BlockNumber,
@ -175,20 +192,83 @@ func (p *Provider) RoundTrip(ctx context.Context) {
"gasUsed", receipt.GasUsed) "gasUsed", receipt.GasUsed)
} }
func (p *Provider) createTx(nonce uint64) *types.Transaction { func (p *Provider) createTx(ctx context.Context, client *iclients.InstrumentedEthClient, nonce uint64) (*types.Transaction, error) {
toAddress := common.HexToAddress(p.walletConfig.Address) var err error
if nonce == 0 {
nonce, err = client.PendingNonceAt(ctx, p.walletConfig.Address)
if err != nil {
log.Error("cant get nounce",
"provider", p.name,
"nonce", nonce,
"err", err)
return nil, err
}
}
gasTipCap, err := client.SuggestGasTipCap(ctx)
if err != nil {
log.Error("cant get gas tip cap",
"provider", p.name,
"err", err)
return nil, err
}
gasTipCap = new(big.Int).Mul(gasTipCap, big.NewInt(110))
gasTipCap = new(big.Int).Div(gasTipCap, big.NewInt(100))
head, err := client.HeaderByNumber(ctx, nil)
if err != nil {
log.Error("cant get base fee from head",
"provider", p.name,
"err", err)
return nil, err
}
baseFee := head.BaseFee
gasFeeCap := new(big.Int).Add(
gasTipCap,
new(big.Int).Mul(baseFee, big.NewInt(2)))
addr := common.HexToAddress(p.walletConfig.Address)
var data []byte var data []byte
tx := types.NewTx(&types.DynamicFeeTx{ dynamicTx := &types.DynamicFeeTx{
ChainID: &p.walletConfig.ChainID, ChainID: &p.walletConfig.ChainID,
Nonce: nonce, Nonce: nonce,
GasFeeCap: &p.walletConfig.GasFeeCap, GasFeeCap: gasFeeCap,
GasTipCap: &p.walletConfig.GasTipCap, GasTipCap: gasTipCap,
Gas: p.walletConfig.GasLimit, To: &addr,
To: &toAddress,
Value: &p.walletConfig.TxValue, Value: &p.walletConfig.TxValue,
Data: data, Data: data,
}
gas, err := client.EstimateGas(ctx, ethereum.CallMsg{
From: addr,
To: &addr,
GasFeeCap: gasFeeCap,
GasTipCap: gasTipCap,
Data: dynamicTx.Data,
Value: dynamicTx.Value,
}) })
return tx if err != nil {
log.Error("cant estimate gas",
"provider", p.name,
"err", err)
return nil, err
}
dynamicTx.Gas = gas
tx := types.NewTx(dynamicTx)
log.Info("tx created",
"provider", p.name,
"from", addr,
"to", dynamicTx.To,
"nonce", dynamicTx.Nonce,
"value", dynamicTx.Value,
"gas", dynamicTx.Gas,
"gasTipCap", dynamicTx.GasTipCap,
"gasFeeCap", dynamicTx.GasFeeCap,
)
return tx, nil
} }
func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) { func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {

@ -15,7 +15,11 @@ type NetworkTransactionPool struct {
M sync.Mutex M sync.Mutex
Transactions map[string]*TransactionState Transactions map[string]*TransactionState
Expected int Expected int
Nonce uint64
// Last time a transaction was sent
LastSend time.Time
// Prevents concurrent transaction send
ExclusiveSend sync.Mutex
} }
type TransactionState struct { type TransactionState struct {