core: make txpool reject too sudden changes (#23095)

* core: make txpool reject too sudden changes

* core: add some metrics to txpool
This commit is contained in:
Martin Holst Swende 2021-08-24 20:48:36 +02:00 committed by GitHub
parent 5cee33eb72
commit d705f5a554
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 7 deletions

@ -111,6 +111,14 @@ var (
invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil) invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil) underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
overflowedTxMeter = metrics.NewRegisteredMeter("txpool/overflowed", nil) overflowedTxMeter = metrics.NewRegisteredMeter("txpool/overflowed", nil)
// throttleTxMeter counts how many transactions are rejected due to too-many-changes between
// txpool reorgs.
throttleTxMeter = metrics.NewRegisteredMeter("txpool/throttle", nil)
// reorgDurationTimer measures how long time a txpool reorg takes.
reorgDurationTimer = metrics.NewRegisteredTimer("txpool/reorgtime", nil)
// dropBetweenReorgHistogram counts how many drops we experience between two reorg runs. It is expected
// that this number is pretty low, since txpool reorgs happen very frequently.
dropBetweenReorgHistogram = metrics.NewRegisteredHistogram("txpool/dropbetweenreorg", nil, metrics.NewExpDecaySample(1028, 0.015))
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
@ -256,6 +264,8 @@ type TxPool struct {
reorgDoneCh chan chan struct{} reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop wg sync.WaitGroup // tracks loop, scheduleReorgLoop
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
} }
type txpoolResetRequest struct { type txpoolResetRequest struct {
@ -663,6 +673,15 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
underpricedTxMeter.Mark(1) underpricedTxMeter.Mark(1)
return false, ErrUnderpriced return false, ErrUnderpriced
} }
// We're about to replace a transaction. The reorg does a more thorough
// analysis of what to remove and how, but it runs async. We don't want to
// do too many replacements between reorg-runs, so we cap the number of
// replacements to 25% of the slots
if pool.changesSinceReorg > int(pool.config.GlobalSlots/4) {
throttleTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}
// New transaction is better than our worse ones, make room for it. // New transaction is better than our worse ones, make room for it.
// If it's a local transaction, forcibly discard all available transactions. // If it's a local transaction, forcibly discard all available transactions.
// Otherwise if we can't make enough room for new one, abort the operation. // Otherwise if we can't make enough room for new one, abort the operation.
@ -674,6 +693,8 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
overflowedTxMeter.Mark(1) overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow return false, ErrTxPoolOverflow
} }
// Bump the counter of rejections-since-reorg
pool.changesSinceReorg += len(drop)
// Kick out the underpriced remote transactions. // Kick out the underpriced remote transactions.
for _, tx := range drop { for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
@ -1114,6 +1135,9 @@ func (pool *TxPool) scheduleReorgLoop() {
// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop. // runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) { func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) {
defer func(t0 time.Time) {
reorgDurationTimer.Update(time.Since(t0))
}(time.Now())
defer close(done) defer close(done)
var promoteAddrs []common.Address var promoteAddrs []common.Address
@ -1163,6 +1187,8 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
highestPending := list.LastElement() highestPending := list.LastElement()
pool.pendingNonces.set(addr, highestPending.Nonce()+1) pool.pendingNonces.set(addr, highestPending.Nonce()+1)
} }
dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg))
pool.changesSinceReorg = 0 // Reset change counter
pool.mu.Unlock() pool.mu.Unlock()
// Notify subsystems for newly added transactions // Notify subsystems for newly added transactions

@ -1946,20 +1946,20 @@ func TestDualHeapEviction(t *testing.T) {
} }
add := func(urgent bool) { add := func(urgent bool) {
txs := make([]*types.Transaction, 20) for i := 0; i < 20; i++ {
for i := range txs { var tx *types.Transaction
// Create a test accounts and fund it // Create a test accounts and fund it
key, _ := crypto.GenerateKey() key, _ := crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000000)) testAddBalance(pool, crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000000))
if urgent { if urgent {
txs[i] = dynamicFeeTx(0, 100000, big.NewInt(int64(baseFee+1+i)), big.NewInt(int64(1+i)), key) tx = dynamicFeeTx(0, 100000, big.NewInt(int64(baseFee+1+i)), big.NewInt(int64(1+i)), key)
highTip = txs[i] highTip = tx
} else { } else {
txs[i] = dynamicFeeTx(0, 100000, big.NewInt(int64(baseFee+200+i)), big.NewInt(1), key) tx = dynamicFeeTx(0, 100000, big.NewInt(int64(baseFee+200+i)), big.NewInt(1), key)
highCap = txs[i] highCap = tx
} }
pool.AddRemotesSync([]*types.Transaction{tx})
} }
pool.AddRemotes(txs)
pending, queued := pool.Stats() pending, queued := pool.Stats()
if pending+queued != 20 { if pending+queued != 20 {
t.Fatalf("transaction count mismatch: have %d, want %d", pending+queued, 10) t.Fatalf("transaction count mismatch: have %d, want %d", pending+queued, 10)