From 675449a1d9e02f130d6e23d5bc2803d8925c9fb0 Mon Sep 17 00:00:00 2001 From: Satyajit Das Date: Thu, 17 Oct 2024 13:23:06 +0530 Subject: [PATCH] core/txpool/legacypool: add overflowpool for txs (#2660) --- cmd/geth/main.go | 1 + cmd/utils/flags.go | 30 +- core/txpool/legacypool/legacypool.go | 119 +++++++- core/txpool/legacypool/legacypool_test.go | 84 +++++- core/txpool/legacypool/tx_overflowpool.go | 171 +++++++++++ .../txpool/legacypool/tx_overflowpool_test.go | 266 ++++++++++++++++++ 6 files changed, 634 insertions(+), 37 deletions(-) create mode 100644 core/txpool/legacypool/tx_overflowpool.go create mode 100644 core/txpool/legacypool/tx_overflowpool_test.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 3a041fcc3..5eb8fa414 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -92,6 +92,7 @@ var ( utils.TxPoolGlobalSlotsFlag, utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, + utils.TxPoolOverflowPoolSlotsFlag, utils.TxPoolLifetimeFlag, utils.TxPoolReannounceTimeFlag, utils.BlobPoolDataDirFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 207628c16..f3aab43d7 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -458,6 +458,12 @@ var ( Value: ethconfig.Defaults.TxPool.GlobalQueue, Category: flags.TxPoolCategory, } + TxPoolOverflowPoolSlotsFlag = &cli.Uint64Flag{ + Name: "txpool.overflowpoolslots", + Usage: "Maximum number of transaction slots in overflow pool", + Value: ethconfig.Defaults.TxPool.OverflowPoolSlots, + Category: flags.TxPoolCategory, + } TxPoolLifetimeFlag = &cli.DurationFlag{ Name: "txpool.lifetime", Usage: "Maximum amount of time non-executable transaction are queued", @@ -1789,6 +1795,9 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) { if ctx.IsSet(TxPoolGlobalQueueFlag.Name) { cfg.GlobalQueue = ctx.Uint64(TxPoolGlobalQueueFlag.Name) } + if ctx.IsSet(TxPoolOverflowPoolSlotsFlag.Name) { + cfg.OverflowPoolSlots = ctx.Uint64(TxPoolOverflowPoolSlotsFlag.Name) + } if ctx.IsSet(TxPoolLifetimeFlag.Name) { cfg.Lifetime = ctx.Duration(TxPoolLifetimeFlag.Name) } @@ -2310,16 +2319,17 @@ func EnableNodeInfo(poolConfig *legacypool.Config, nodeInfo *p2p.NodeInfo) Setup return func() { // register node info into metrics metrics.NewRegisteredLabel("node-info", nil).Mark(map[string]interface{}{ - "Enode": nodeInfo.Enode, - "ENR": nodeInfo.ENR, - "ID": nodeInfo.ID, - "PriceLimit": poolConfig.PriceLimit, - "PriceBump": poolConfig.PriceBump, - "AccountSlots": poolConfig.AccountSlots, - "GlobalSlots": poolConfig.GlobalSlots, - "AccountQueue": poolConfig.AccountQueue, - "GlobalQueue": poolConfig.GlobalQueue, - "Lifetime": poolConfig.Lifetime, + "Enode": nodeInfo.Enode, + "ENR": nodeInfo.ENR, + "ID": nodeInfo.ID, + "PriceLimit": poolConfig.PriceLimit, + "PriceBump": poolConfig.PriceBump, + "AccountSlots": poolConfig.AccountSlots, + "GlobalSlots": poolConfig.GlobalSlots, + "AccountQueue": poolConfig.AccountQueue, + "GlobalQueue": poolConfig.GlobalQueue, + "OverflowPoolSlots": poolConfig.OverflowPoolSlots, + "Lifetime": poolConfig.Lifetime, }) } } diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 91cd01e7b..0d5a1fb18 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -19,6 +19,7 @@ package legacypool import ( "errors" + "fmt" "math" "math/big" "sort" @@ -99,10 +100,11 @@ var ( // that this number is pretty low, since txpool reorgs happen very frequently. dropBetweenReorgHistogram = metrics.NewRegisteredHistogram("txpool/dropbetweenreorg", nil, metrics.NewExpDecaySample(1028, 0.015)) - pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) - queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) - localGauge = metrics.NewRegisteredGauge("txpool/local", nil) - slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) + pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) + queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) + localGauge = metrics.NewRegisteredGauge("txpool/local", nil) + slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) + OverflowPoolGauge = metrics.NewRegisteredGauge("txpool/overflowpool", nil) reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil) ) @@ -133,10 +135,11 @@ type Config struct { PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) - AccountSlots uint64 // Number of executable transaction slots guaranteed per account - GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts - AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account - GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts + AccountSlots uint64 // Number of executable transaction slots guaranteed per account + GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts + AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account + GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts + OverflowPoolSlots uint64 // Maximum number of transaction slots in overflow pool Lifetime time.Duration // Maximum amount of time non-executable transaction are queued ReannounceTime time.Duration // Duration for announcing local pending transactions again @@ -150,10 +153,11 @@ var DefaultConfig = Config{ PriceLimit: 1, PriceBump: 10, - AccountSlots: 16, - GlobalSlots: 4096 + 1024, // urgent + floating queue capacity with 4:1 ratio - AccountQueue: 64, - GlobalQueue: 1024, + AccountSlots: 16, + GlobalSlots: 4096 + 1024, // urgent + floating queue capacity with 4:1 ratio + AccountQueue: 64, + GlobalQueue: 1024, + OverflowPoolSlots: 0, Lifetime: 3 * time.Hour, ReannounceTime: 10 * 365 * 24 * time.Hour, @@ -235,6 +239,8 @@ type LegacyPool struct { all *lookup // All transactions to allow lookups priced *pricedList // All transactions sorted by price + localBufferPool *TxOverflowPool // Local buffer transactions + reqResetCh chan *txpoolResetRequest reqPromoteCh chan *accountSet queueTxEventCh chan *types.Transaction @@ -272,6 +278,7 @@ func New(config Config, chain BlockChain) *LegacyPool { reorgDoneCh: make(chan chan struct{}), reorgShutdownCh: make(chan struct{}), initDoneCh: make(chan struct{}), + localBufferPool: NewTxOverflowPoolHeap(config.OverflowPoolSlots), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -408,7 +415,6 @@ func (pool *LegacyPool) loop() { if !pool.locals.contains(addr) { continue } - for _, tx := range list.Flatten() { // Default ReannounceTime is 10 years, won't announce by default. if time.Since(tx.Time()) < pool.config.ReannounceTime { @@ -517,6 +523,17 @@ func (pool *LegacyPool) Stats() (int, int) { return pool.stats() } +func (pool *LegacyPool) statsOverflowPool() int { + pool.mu.RLock() + defer pool.mu.RUnlock() + + if pool.localBufferPool == nil { + return 0 + } + + return pool.localBufferPool.Size() +} + // stats retrieves the current pool stats, namely the number of pending and the // number of queued (non-executable) transactions. func (pool *LegacyPool) stats() (int, int) { @@ -831,6 +848,8 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e } } + pool.addToOverflowPool(drop, isLocal) + // Kick out the underpriced remote transactions. for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) @@ -887,6 +906,29 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e return replaced, nil } +func (pool *LegacyPool) addToOverflowPool(drop types.Transactions, isLocal bool) { + // calculate total number of slots in drop. Accordingly add them to OverflowPool (if there is space) + availableSlotsOverflowPool := pool.availableSlotsOverflowPool() + if availableSlotsOverflowPool > 0 { + // transfer availableSlotsOverflowPool number of transactions slots from drop to OverflowPool + currentSlotsUsed := 0 + for i, tx := range drop { + txSlots := numSlots(tx) + if currentSlotsUsed+txSlots <= availableSlotsOverflowPool { + from, _ := types.Sender(pool.signer, tx) + pool.localBufferPool.Add(tx) + log.Debug("adding to OverflowPool", "transaction", tx.Hash().String(), "from", from.String()) + currentSlotsUsed += txSlots + } else { + log.Debug("not all got added to OverflowPool", "totalAdded", i+1) + return + } + } + } else { + log.Debug("adding to OverflowPool unsuccessful", "availableSlotsOverflowPool", availableSlotsOverflowPool) + } +} + // isGapped reports whether the given transaction is immediately executable. func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) bool { // Short circuit if transaction falls within the scope of the pending list @@ -1333,7 +1375,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, reorgDurationTimer.Update(time.Since(t0)) }(time.Now()) defer close(done) - var promoteAddrs []common.Address if dirtyAccounts != nil && reset == nil { // Only dirty accounts need to be promoted, unless we're resetting. @@ -1391,6 +1432,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, pool.changesSinceReorg = 0 // Reset change counter pool.mu.Unlock() + // Transfer transactions from OverflowPool to MainPool for new block import + pool.transferTransactions() + // Notify subsystems for newly added transactions for _, tx := range promoted { addr, _ := types.Sender(pool.signer, tx) @@ -2038,3 +2082,50 @@ func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions { func numSlots(tx *types.Transaction) int { return int((tx.Size() + txSlotSize - 1) / txSlotSize) } + +// transferTransactions moves transactions from OverflowPool to MainPool +func (pool *LegacyPool) transferTransactions() { + // Fail fast if the overflow pool is empty + if pool.localBufferPool.Size() == 0 { + return + } + + maxMainPoolSize := int(pool.config.GlobalSlots + pool.config.GlobalQueue) + // Use pool.all.Slots() to get the total slots used by all transactions + currentMainPoolSize := pool.all.Slots() + if currentMainPoolSize >= maxMainPoolSize { + return + } + + extraSlots := maxMainPoolSize - currentMainPoolSize + extraTransactions := (extraSlots + 3) / 4 // Since a transaction can take up to 4 slots + log.Debug("Will attempt to transfer from OverflowPool to MainPool", "transactions", extraTransactions) + txs := pool.localBufferPool.Flush(extraTransactions) + if len(txs) == 0 { + return + } + + pool.Add(txs, true, false) +} + +func (pool *LegacyPool) availableSlotsOverflowPool() int { + maxOverflowPoolSize := int(pool.config.OverflowPoolSlots) + availableSlots := maxOverflowPoolSize - pool.localBufferPool.Size() + if availableSlots > 0 { + return availableSlots + } + return 0 +} + +func (pool *LegacyPool) PrintTxStats() { + for _, l := range pool.pending { + for _, transaction := range l.txs.items { + from, _ := types.Sender(pool.signer, transaction) + fmt.Println("from: ", from, " Pending:", transaction.Hash().String(), transaction.GasFeeCap(), transaction.GasTipCap()) + } + } + + pool.localBufferPool.PrintTxStats() + fmt.Println("length of all: ", pool.all.Slots()) + fmt.Println("----------------------------------------------------") +} diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 5f7a625f1..53c62b9bd 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -40,6 +40,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" "github.com/holiman/uint256" + "github.com/stretchr/testify/assert" ) var ( @@ -1739,6 +1740,7 @@ func TestRepricingKeepsLocals(t *testing.T) { // Note, local transactions are never allowed to be dropped. func TestUnderpricing(t *testing.T) { t.Parallel() + testTxPoolConfig.OverflowPoolSlots = 5 // Create the pool to test the pricing enforcement with statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) @@ -1931,6 +1933,8 @@ func TestUnderpricingDynamicFee(t *testing.T) { pool.config.GlobalSlots = 2 pool.config.GlobalQueue = 2 + pool.config.OverflowPoolSlots = 0 + // Keep track of transaction events to ensure all executables get announced events := make(chan core.NewTxsEvent, 32) sub := pool.txFeed.Subscribe(events) @@ -1955,7 +1959,6 @@ func TestUnderpricingDynamicFee(t *testing.T) { // Import the batch and that both pending and queued transactions match up pool.addRemotes(txs) // Pend K0:0, K0:1; Que K1:1 pool.addLocal(ltx) // +K2:0 => Pend K0:0, K0:1, K2:0; Que K1:1 - pending, queued := pool.Stats() if pending != 3 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) @@ -1995,9 +1998,9 @@ func TestUnderpricingDynamicFee(t *testing.T) { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) } if queued != 2 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) } - if err := validateEvents(events, 2); err != nil { + if err := validateEvents(events, 2); err != nil { // todo make it 4...After this validateEvents the pending becomes 3?! t.Fatalf("additional event firing failed: %v", err) } if err := validatePoolInternals(pool); err != nil { @@ -2012,11 +2015,12 @@ func TestUnderpricingDynamicFee(t *testing.T) { if err := pool.addLocal(ltx); err != nil { t.Fatalf("failed to add new underpriced local transaction: %v", err) } + pending, queued = pool.Stats() - if pending != 3 { + if pending != 3 { // 3 t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) } - if queued != 1 { + if queued != 1 { // 1 t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) } if err := validateEvents(events, 2); err != nil { @@ -2032,41 +2036,51 @@ func TestUnderpricingDynamicFee(t *testing.T) { func TestDualHeapEviction(t *testing.T) { t.Parallel() + testTxPoolConfig.OverflowPoolSlots = 1 pool, _ := setupPoolWithConfig(eip1559Config) defer pool.Close() - pool.config.GlobalSlots = 10 - pool.config.GlobalQueue = 10 + pool.config.GlobalSlots = 2 + pool.config.GlobalQueue = 2 + pool.config.OverflowPoolSlots = 1 var ( highTip, highCap *types.Transaction baseFee int + highCapValue int64 + highTipValue int64 ) check := func(tx *types.Transaction, name string) { if pool.all.GetRemote(tx.Hash()) == nil { - t.Fatalf("highest %s transaction evicted from the pool", name) + t.Fatalf("highest %s transaction evicted from the pool, gasTip: %s, gasFeeCap: %s, hash: %s", name, highTip.GasTipCap().String(), highCap.GasFeeCap().String(), tx.Hash().String()) } } add := func(urgent bool) { - for i := 0; i < 20; i++ { + for i := 0; i < 4; i++ { var tx *types.Transaction // Create a test accounts and fund it key, _ := crypto.GenerateKey() testAddBalance(pool, crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000000)) if urgent { tx = dynamicFeeTx(0, 100000, big.NewInt(int64(baseFee+1+i)), big.NewInt(int64(1+i)), key) - highTip = tx + if int64(1+i) > highTipValue || (int64(1+i) == highTipValue && int64(baseFee+1+i) > highTip.GasFeeCap().Int64()) { + highTipValue = int64(1 + i) + highTip = tx + } } else { tx = dynamicFeeTx(0, 100000, big.NewInt(int64(baseFee+200+i)), big.NewInt(1), key) - highCap = tx + if int64(baseFee+200+i) > highCapValue { + highCapValue = int64(baseFee + 200 + i) + highCap = tx + } } pool.addRemotesSync([]*types.Transaction{tx}) } pending, queued := pool.Stats() - if pending+queued != 20 { - t.Fatalf("transaction count mismatch: have %d, want %d", pending+queued, 10) + if pending+queued != 4 { + t.Fatalf("transaction count mismatch: have %d, want %d, pending %d, queued %d, OverflowPool %d", pending+queued, 5, pending, queued, pool.localBufferPool.Size()) } } @@ -2231,6 +2245,50 @@ func TestReplacement(t *testing.T) { } } +func TestTransferTransactions(t *testing.T) { + t.Parallel() + testTxPoolConfig.OverflowPoolSlots = 1 + pool, _ := setupPoolWithConfig(eip1559Config) + defer pool.Close() + + pool.config.GlobalSlots = 1 + pool.config.GlobalQueue = 2 + + // Create a number of test accounts and fund them + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + + tx := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[0]) + from, _ := types.Sender(pool.signer, tx) + pool.addToOverflowPool([]*types.Transaction{tx}, true) + pending, queue := pool.Stats() + + assert.Equal(t, 0, pending, "pending transactions mismatched") + assert.Equal(t, 0, queue, "queued transactions mismatched") + assert.Equal(t, 1, pool.statsOverflowPool(), "OverflowPool size unexpected") + + tx2 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[1]) + pool.addToOverflowPool([]*types.Transaction{tx2}, true) + assert.Equal(t, 1, pool.statsOverflowPool(), "OverflowPool size unexpected") + <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) + pending, queue = pool.Stats() + + assert.Equal(t, 0, pending, "pending transactions mismatched") + assert.Equal(t, 1, queue, "queued transactions mismatched") + assert.Equal(t, 0, pool.statsOverflowPool(), "OverflowPool size unexpected") + + tx3 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[2]) + pool.addToOverflowPool([]*types.Transaction{tx3}, true) + pending, queue = pool.Stats() + + assert.Equal(t, 1, pending, "pending transactions mismatched") + assert.Equal(t, 0, queue, "queued transactions mismatched") + assert.Equal(t, 1, pool.statsOverflowPool(), "OverflowPool size unexpected") +} + // Tests that the pool rejects replacement dynamic fee transactions that don't // meet the minimum price bump required. func TestReplacementDynamicFee(t *testing.T) { diff --git a/core/txpool/legacypool/tx_overflowpool.go b/core/txpool/legacypool/tx_overflowpool.go new file mode 100644 index 000000000..4bfd4b6f5 --- /dev/null +++ b/core/txpool/legacypool/tx_overflowpool.go @@ -0,0 +1,171 @@ +package legacypool + +import ( + "container/heap" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// txHeapItem implements the Interface interface (https://pkg.go.dev/container/heap#Interface) of heap so that it can be heapified +type txHeapItem struct { + tx *types.Transaction + timestamp int64 // Unix timestamp (nanoseconds) of when the transaction was added + index int +} + +type txHeap []*txHeapItem + +func (h txHeap) Len() int { return len(h) } +func (h txHeap) Less(i, j int) bool { + return h[i].timestamp < h[j].timestamp +} +func (h txHeap) Swap(i, j int) { + if i < 0 || j < 0 || i >= len(h) || j >= len(h) { + return // Silently fail if indices are out of bounds + } + h[i], h[j] = h[j], h[i] + if h[i] != nil { + h[i].index = i + } + if h[j] != nil { + h[j].index = j + } +} + +func (h *txHeap) Push(x interface{}) { + item, ok := x.(*txHeapItem) + if !ok { + return + } + n := len(*h) + item.index = n + *h = append(*h, item) +} + +func (h *txHeap) Pop() interface{} { + old := *h + n := len(old) + if n == 0 { + return nil // Return nil if the heap is empty + } + item := old[n-1] + old[n-1] = nil // avoid memory leak + *h = old[0 : n-1] + if item != nil { + item.index = -1 // for safety + } + return item +} + +type TxOverflowPool struct { + txHeap txHeap + index map[common.Hash]*txHeapItem + mu sync.RWMutex + maxSize uint64 + totalSize int +} + +func NewTxOverflowPoolHeap(estimatedMaxSize uint64) *TxOverflowPool { + return &TxOverflowPool{ + txHeap: make(txHeap, 0, estimatedMaxSize), + index: make(map[common.Hash]*txHeapItem, estimatedMaxSize), + maxSize: estimatedMaxSize, + } +} + +func (tp *TxOverflowPool) Add(tx *types.Transaction) { + tp.mu.Lock() + defer tp.mu.Unlock() + + if _, exists := tp.index[tx.Hash()]; exists { + // Transaction already in pool, ignore + return + } + + if uint64(len(tp.txHeap)) >= tp.maxSize { + // Remove the oldest transaction to make space + oldestItem, ok := heap.Pop(&tp.txHeap).(*txHeapItem) + if !ok || oldestItem == nil { + return + } + delete(tp.index, oldestItem.tx.Hash()) + tp.totalSize -= numSlots(oldestItem.tx) + OverflowPoolGauge.Dec(1) + } + + item := &txHeapItem{ + tx: tx, + timestamp: time.Now().UnixNano(), + } + heap.Push(&tp.txHeap, item) + tp.index[tx.Hash()] = item + tp.totalSize += numSlots(tx) + OverflowPoolGauge.Inc(1) +} + +func (tp *TxOverflowPool) Get(hash common.Hash) (*types.Transaction, bool) { + tp.mu.RLock() + defer tp.mu.RUnlock() + if item, ok := tp.index[hash]; ok { + return item.tx, true + } + return nil, false +} + +func (tp *TxOverflowPool) Remove(hash common.Hash) { + tp.mu.Lock() + defer tp.mu.Unlock() + if item, ok := tp.index[hash]; ok { + heap.Remove(&tp.txHeap, item.index) + delete(tp.index, hash) + tp.totalSize -= numSlots(item.tx) + OverflowPoolGauge.Dec(1) + } +} + +func (tp *TxOverflowPool) Flush(n int) []*types.Transaction { + tp.mu.Lock() + defer tp.mu.Unlock() + if n > tp.txHeap.Len() { + n = tp.txHeap.Len() + } + txs := make([]*types.Transaction, n) + for i := 0; i < n; i++ { + item, ok := heap.Pop(&tp.txHeap).(*txHeapItem) + if !ok || item == nil { + continue + } + txs[i] = item.tx + delete(tp.index, item.tx.Hash()) + tp.totalSize -= numSlots(item.tx) + } + + OverflowPoolGauge.Dec(int64(n)) + return txs +} + +func (tp *TxOverflowPool) Len() int { + tp.mu.RLock() + defer tp.mu.RUnlock() + return tp.txHeap.Len() +} + +func (tp *TxOverflowPool) Size() int { + tp.mu.RLock() + defer tp.mu.RUnlock() + return tp.totalSize +} + +func (tp *TxOverflowPool) PrintTxStats() { + tp.mu.RLock() + defer tp.mu.RUnlock() + for _, item := range tp.txHeap { + tx := item.tx + fmt.Printf("Hash: %s, Timestamp: %d, GasFeeCap: %s, GasTipCap: %s\n", + tx.Hash().String(), item.timestamp, tx.GasFeeCap().String(), tx.GasTipCap().String()) + } +} diff --git a/core/txpool/legacypool/tx_overflowpool_test.go b/core/txpool/legacypool/tx_overflowpool_test.go new file mode 100644 index 000000000..9a4aee500 --- /dev/null +++ b/core/txpool/legacypool/tx_overflowpool_test.go @@ -0,0 +1,266 @@ +package legacypool + +import ( + "math/big" + rand2 "math/rand" + "testing" + "time" + + "github.com/cometbft/cometbft/libs/rand" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// Helper function to create a test transaction +func createTestTx(nonce uint64, gasPrice *big.Int) *types.Transaction { + to := common.HexToAddress("0x1234567890123456789012345678901234567890") + return types.NewTransaction(nonce, to, big.NewInt(1000), 21000, gasPrice, nil) +} + +func TestNewTxOverflowPoolHeap(t *testing.T) { + pool := NewTxOverflowPoolHeap(0) + if pool == nil { + t.Fatal("NewTxOverflowPoolHeap returned nil") + } + if pool.Len() != 0 { + t.Errorf("New pool should be empty, got length %d", pool.Len()) + } +} + +func TestTxOverflowPoolHeapAdd(t *testing.T) { + pool := NewTxOverflowPoolHeap(1) + tx := createTestTx(1, big.NewInt(1000)) + + pool.Add(tx) + if pool.Len() != 1 { + t.Errorf("Pool should have 1 transaction, got %d", pool.Len()) + } + + // Add the same transaction again + pool.Add(tx) + if pool.Len() != 1 { + t.Errorf("Pool should still have 1 transaction after adding duplicate, got %d", pool.Len()) + } +} + +func TestTxOverflowPoolHeapGet(t *testing.T) { + pool := NewTxOverflowPoolHeap(1) + tx := createTestTx(1, big.NewInt(1000)) + pool.Add(tx) + + gotTx, exists := pool.Get(tx.Hash()) + if !exists { + t.Fatal("Get returned false for existing transaction") + } + if gotTx.Hash() != tx.Hash() { + t.Errorf("Get returned wrong transaction. Want %v, got %v", tx.Hash(), gotTx.Hash()) + } + + _, exists = pool.Get(common.Hash{}) + if exists { + t.Error("Get returned true for non-existent transaction") + } +} + +func TestTxOverflowPoolHeapRemove(t *testing.T) { + pool := NewTxOverflowPoolHeap(1) + tx := createTestTx(1, big.NewInt(1000)) + pool.Add(tx) + + pool.Remove(tx.Hash()) + if pool.Len() != 0 { + t.Errorf("Pool should be empty after removing the only transaction, got length %d", pool.Len()) + } + + // Try to remove non-existent transaction + pool.Remove(common.Hash{}) + if pool.Len() != 0 { + t.Error("Removing non-existent transaction should not affect pool size") + } +} + +func TestTxOverflowPoolHeapPopN(t *testing.T) { + pool := NewTxOverflowPoolHeap(3) + tx1 := createTestTx(1, big.NewInt(1000)) + tx2 := createTestTx(2, big.NewInt(2000)) + tx3 := createTestTx(3, big.NewInt(3000)) + + pool.Add(tx1) + time.Sleep(time.Millisecond) // Ensure different timestamps + pool.Add(tx2) + time.Sleep(time.Millisecond) + pool.Add(tx3) + + popped := pool.Flush(2) + if len(popped) != 2 { + t.Fatalf("PopN(2) should return 2 transactions, got %d", len(popped)) + } + if popped[0].Hash() != tx1.Hash() || popped[1].Hash() != tx2.Hash() { + t.Error("PopN returned transactions in wrong order") + } + if pool.Len() != 1 { + t.Errorf("Pool should have 1 transaction left, got %d", pool.Len()) + } + + // Pop more than available + popped = pool.Flush(2) + if len(popped) != 1 { + t.Fatalf("PopN(2) should return 1 transaction when only 1 is left, got %d", len(popped)) + } + if popped[0].Hash() != tx3.Hash() { + t.Error("PopN returned wrong transaction") + } + if pool.Len() != 0 { + t.Errorf("Pool should be empty, got length %d", pool.Len()) + } +} + +func TestTxOverflowPoolHeapOrdering(t *testing.T) { + pool := NewTxOverflowPoolHeap(3) + tx1 := createTestTx(1, big.NewInt(1000)) + tx2 := createTestTx(2, big.NewInt(2000)) + tx3 := createTestTx(3, big.NewInt(3000)) + + pool.Add(tx2) + time.Sleep(time.Millisecond) // Ensure different timestamps + pool.Add(tx1) + pool.Add(tx3) // Added immediately after tx1, should have same timestamp but higher sequence + + popped := pool.Flush(3) + if len(popped) != 3 { + t.Fatalf("PopN(3) should return 3 transactions, got %d", len(popped)) + } + if popped[0].Hash() != tx2.Hash() || popped[1].Hash() != tx1.Hash() || popped[2].Hash() != tx3.Hash() { + t.Error("Transactions not popped in correct order (earliest timestamp first, then by sequence)") + } +} + +func TestTxOverflowPoolHeapLen(t *testing.T) { + pool := NewTxOverflowPoolHeap(2) + if pool.Len() != 0 { + t.Errorf("New pool should have length 0, got %d", pool.Len()) + } + + pool.Add(createTestTx(1, big.NewInt(1000))) + if pool.Len() != 1 { + t.Errorf("Pool should have length 1 after adding a transaction, got %d", pool.Len()) + } + + pool.Add(createTestTx(2, big.NewInt(2000))) + if pool.Len() != 2 { + t.Errorf("Pool should have length 2 after adding another transaction, got %d", pool.Len()) + } + + pool.Flush(1) + if pool.Len() != 1 { + t.Errorf("Pool should have length 1 after popping a transaction, got %d", pool.Len()) + } +} + +// Helper function to create a random test transaction +func createRandomTestTx() *types.Transaction { + nonce := uint64(rand.Intn(1000000)) + to := common.BytesToAddress(rand.Bytes(20)) + amount := new(big.Int).Rand(rand2.New(rand2.NewSource(rand.Int63())), big.NewInt(1e18)) + gasLimit := uint64(21000) + gasPrice := new(big.Int).Rand(rand2.New(rand2.NewSource(rand.Int63())), big.NewInt(1e9)) + data := rand.Bytes(100) + return types.NewTransaction(nonce, to, amount, gasLimit, gasPrice, data) +} + +func createRandomTestTxs(n int) []*types.Transaction { + txs := make([]*types.Transaction, n) + for i := 0; i < n; i++ { + txs[i] = createRandomTestTx() + } + return txs +} + +// goos: darwin +// goarch: arm64 +// pkg: github.com/ethereum/go-ethereum/core/txpool/legacypool +// BenchmarkTxOverflowPoolHeapAdd-8 813326 2858 ns/op +func BenchmarkTxOverflowPoolHeapAdd(b *testing.B) { + pool := NewTxOverflowPoolHeap(uint64(b.N)) + txs := createRandomTestTxs(b.N) + b.ResetTimer() + for i := 0; i < b.N; i++ { + pool.Add(txs[i]) + } +} + +// BenchmarkTxOverflowPoolHeapGet-8 32613938 35.63 ns/op +func BenchmarkTxOverflowPoolHeapGet(b *testing.B) { + pool := NewTxOverflowPoolHeap(1000) + txs := createRandomTestTxs(1000) + for _, tx := range txs { + pool.Add(tx) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + pool.Get(txs[i%1000].Hash()) + } +} + +// BenchmarkTxOverflowPoolHeapRemove-8 3020841 417.8 ns/op +func BenchmarkTxOverflowPoolHeapRemove(b *testing.B) { + pool := NewTxOverflowPoolHeap(uint64(b.N)) + txs := createRandomTestTxs(b.N) + for _, tx := range txs { + pool.Add(tx) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + pool.Remove(txs[i].Hash()) + } +} + +// BenchmarkTxOverflowPoolHeapFlush-8 42963656 29.90 ns/op +func BenchmarkTxOverflowPoolHeapFlush(b *testing.B) { + pool := NewTxOverflowPoolHeap(1000) + txs := createRandomTestTxs(1000) + for _, tx := range txs { + pool.Add(tx) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + pool.Flush(10) + } +} + +// BenchmarkTxOverflowPoolHeapLen-8 79147188 20.07 ns/op +func BenchmarkTxOverflowPoolHeapLen(b *testing.B) { + pool := NewTxOverflowPoolHeap(1000) + txs := createRandomTestTxs(1000) + for _, tx := range txs { + pool.Add(tx) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + pool.Len() + } +} + +// BenchmarkTxOverflowPoolHeapAddRemove-8 902896 1546 ns/op +func BenchmarkTxOverflowPoolHeapAddRemove(b *testing.B) { + pool := NewTxOverflowPoolHeap(uint64(b.N)) + txs := createRandomTestTxs(b.N) + b.ResetTimer() + for i := 0; i < b.N; i++ { + pool.Add(txs[i]) + pool.Remove(txs[i].Hash()) + } +} + +// BenchmarkTxOverflowPoolHeapAddFlush-8 84417 14899 ns/op +func BenchmarkTxOverflowPoolHeapAddFlush(b *testing.B) { + pool := NewTxOverflowPoolHeap(uint64(b.N * 10)) + txs := createRandomTestTxs(b.N * 10) + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := 0; j < 10; j++ { + pool.Add(txs[i*10+j]) + } + pool.Flush(10) + } +}