pool: remove pool2 from legacypool

This commit is contained in:
emailtovamos 2024-09-25 16:05:01 +01:00
parent 09575625db
commit 846e55b9a4
6 changed files with 59 additions and 164 deletions

@ -91,7 +91,6 @@ var (
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolPool2SlotsFlag,
utils.TxPoolPool3SlotsFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,

@ -450,12 +450,6 @@ var (
Value: ethconfig.Defaults.TxPool.GlobalQueue,
Category: flags.TxPoolCategory,
}
TxPoolPool2SlotsFlag = &cli.Uint64Flag{
Name: "txpool.pool2slots",
Usage: "Maximum number of transaction slots in pool 2",
Value: ethconfig.Defaults.TxPool.Pool2Slots,
Category: flags.TxPoolCategory,
}
TxPoolPool3SlotsFlag = &cli.Uint64Flag{
Name: "txpool.pool3slots",
Usage: "Maximum number of transaction slots in pool 3",
@ -1774,9 +1768,6 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolGlobalQueueFlag.Name) {
cfg.GlobalQueue = ctx.Uint64(TxPoolGlobalQueueFlag.Name)
}
if ctx.IsSet(TxPoolPool2SlotsFlag.Name) {
cfg.Pool2Slots = ctx.Uint64(TxPoolPool2SlotsFlag.Name)
}
if ctx.IsSet(TxPoolPool3SlotsFlag.Name) {
cfg.Pool3Slots = ctx.Uint64(TxPoolPool3SlotsFlag.Name)
}
@ -2310,7 +2301,6 @@ func EnableNodeInfo(poolConfig *legacypool.Config, nodeInfo *p2p.NodeInfo) Setup
"GlobalSlots": poolConfig.GlobalSlots,
"AccountQueue": poolConfig.AccountQueue,
"GlobalQueue": poolConfig.GlobalQueue,
"Pool2Slots": poolConfig.Pool2Slots,
"Pool3Slots": poolConfig.Pool3Slots,
"Lifetime": poolConfig.Lifetime,
})

@ -104,7 +104,6 @@ var (
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)
pool2Gauge = metrics.NewRegisteredGauge("txpool/pool2", nil)
pool3Gauge = metrics.NewRegisteredGauge("txpool/pool3", nil)
reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)
@ -140,7 +139,6 @@ type Config struct {
GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
Pool2Slots uint64 // Maximum number of transaction slots in pool 2
Pool3Slots uint64 // Maximum number of transaction slots in pool 3
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
@ -159,7 +157,6 @@ var DefaultConfig = Config{
GlobalSlots: 4096 + 1024, // urgent + floating queue capacity with 4:1 ratio
AccountQueue: 64,
GlobalQueue: 1024,
Pool2Slots: 1024,
Pool3Slots: 1024,
Lifetime: 3 * time.Hour,
@ -475,6 +472,7 @@ func (pool *LegacyPool) Close() error {
// Reset implements txpool.SubPool, allowing the legacy pool's internal state to be
// kept in sync with the main transaction pool's internal state.
func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
fmt.Println("Reset request")
wait := pool.requestReset(oldHead, newHead)
<-wait
}
@ -781,7 +779,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
maxPool1Size := pool.config.GlobalSlots + pool.config.GlobalQueue
maxPool2Size := pool.config.Pool2Slots
txPoolSizeAfterCurrentTx := uint64(pool.all.Slots() + numSlots(tx))
// Make the local flag. If it's from local source or it's from the network but
@ -821,7 +818,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
// If the transaction pool is full, discard underpriced transactions
if txPoolSizeAfterCurrentTx > (maxPool1Size + maxPool2Size) {
if txPoolSizeAfterCurrentTx > maxPool1Size {
// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
@ -841,7 +838,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// New transaction is better than our worse ones, make room for it.
// If it's a local transaction, forcibly discard all available transactions.
// Otherwise if we can't make enough room for new one, abort the operation.
toBeDiscarded := pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue+pool.config.Pool2Slots) + numSlots(tx)
toBeDiscarded := pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue) + numSlots(tx)
drop, success := pool.priced.Discard(toBeDiscarded, isLocal)
// Special case, we still can't make the room for the new remote one.
@ -903,7 +900,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pool.all.Add(tx, isLocal)
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.queueTxEvent(tx, false) // At this point pool1 can incorporate this. So no need for pool2 or pool3
pool.queueTxEvent(tx, false)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
// Successful promotion, bump the heartbeat
@ -912,7 +909,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
// New transaction isn't replacing a pending one, push into queue
replaced, err = pool.enqueueTx(hash, tx, isLocal, true, true) // At this point pool1 can incorporate this. So no need for pool2 or pool3
replaced, err = pool.enqueueTx(hash, tx, isLocal, true, true)
if err != nil {
return false, err
}
@ -943,57 +940,15 @@ func (pool *LegacyPool) addToPool3(drop types.Transactions, isLocal bool) {
txSlots := numSlots(tx)
if currentSlotsUsed+txSlots <= availableSlotsPool3 {
from, _ := types.Sender(pool.signer, tx)
pool.addToPool12OrPool3(tx, from, isLocal, false, false, true)
//pool.addToPool12OrPool3(tx, from, isLocal, false, false, true)
pool.localBufferPool.Add(tx)
log.Debug("adding to pool3", "transaction", tx.Hash().String(), "from", from.String())
currentSlotsUsed += txSlots
}
}
}
}
// addToPool12OrPool3 adds a transaction to pool1 or pool2 or pool3 depending on which one is asked for
func (pool *LegacyPool) addToPool12OrPool3(tx *types.Transaction, from common.Address, isLocal bool, pool1, pool2, pool3 bool) (bool, error) {
if pool1 {
pool.journalTx(from, tx)
pool.queueTxEvent(tx, false)
_, err := pool.enqueueTx(tx.Hash(), tx, isLocal, true, false) // At this point pool1 can incorporate this. So no need for pool2 or pool3
if err != nil {
return false, err
}
dirty := newAccountSet(pool.signer)
dirty.addTx(tx)
go func() {
<-pool.requestPromoteExecutables(dirty)
}()
log.Trace("Pooled new executable transaction", "hash", tx.Hash(), "from", from, "to", tx.To())
// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()
return true, nil
} else if pool2 {
pool.journalTx(from, tx)
pool.queueTxEvent(tx, true)
_, err := pool.enqueueTx(tx.Hash(), tx, isLocal, true, true)
if err != nil {
return false, err
}
dirty := newAccountSet(pool.signer)
dirty.addTx(tx)
pool2Gauge.Inc(1)
go func() {
<-pool.requestPromoteExecutables(dirty)
}()
log.Trace("Pooled new executable transaction", "hash", tx.Hash(), "from", from, "to", tx.To())
// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()
return true, nil
} else if pool3 {
pool.localBufferPool.Add(tx)
log.Debug("adding to pool3", "transaction", tx.Hash().String())
return true, nil
} else {
return false, errors.New("could not add to any pool")
log.Debug("adding to pool3 unsuccessful", "availableSlotsPool3", availableSlotsPool3)
fmt.Println("adding to pool3 unsuccessful")
}
}
@ -1448,7 +1403,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
reorgDurationTimer.Update(time.Since(t0))
}(time.Now())
defer close(done)
fmt.Println("runReorg called")
var promoteAddrs []common.Address
if dirtyAccounts != nil && reset == nil {
// Only dirty accounts need to be promoted, unless we're resetting.
@ -1475,9 +1430,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
}
//// Transfer transactions from pool3 to pool2 for new block import
//pool.transferTransactions()
// Check for pending transactions for every account that sent new ones
promoted := pool.promoteExecutables(promoteAddrs)
@ -1510,7 +1462,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.changesSinceReorg = 0 // Reset change counter
pool.mu.Unlock()
// Transfer transactions from pool3 to pool2 for new block import
// Transfer transactions from pool3 to pool1 for new block import
pool.transferTransactions()
// Notify subsystems for newly added transactions
@ -1650,6 +1602,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
fmt.Println("promoteExecutables called")
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
@ -1685,10 +1638,8 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
}
}
log.Trace("Promoted queued transactions", "count", len(promoted))
fmt.Println("promoting")
queuedGauge.Dec(int64(len(readies)))
if list.txs.staticOnly {
pool2Gauge.Dec(int64(len(readies)))
}
// Drop all transactions over the allowed limit
var caps types.Transactions
@ -1704,9 +1655,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if list.txs.staticOnly {
pool2Gauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
@ -1815,7 +1764,7 @@ func (pool *LegacyPool) truncateQueue() {
for _, list := range pool.queue {
queued += uint64(list.Len())
}
queueMax := pool.config.GlobalQueue + pool.config.Pool2Slots
queueMax := pool.config.GlobalQueue
if queued <= queueMax {
return
}
@ -2210,31 +2159,30 @@ func (pool *LegacyPool) startPeriodicTransfer(t time.Duration) {
// transferTransactions mainly moves from pool 3 to pool 2
func (pool *LegacyPool) transferTransactions() {
maxPool1Size := int(pool.config.GlobalSlots + pool.config.GlobalQueue)
maxPool2Size := int(pool.config.Pool2Slots)
maxPool1Pool2CombinedSize := maxPool1Size + maxPool2Size
extraSizePool2Pool1 := maxPool1Pool2CombinedSize - int(uint64(len(pool.pending))+uint64(len(pool.queue)))
if extraSizePool2Pool1 <= 0 {
extraSizePool1 := maxPool1Size - int(uint64(len(pool.pending))+uint64(len(pool.queue)))
if extraSizePool1 <= 0 {
return
}
currentPool1Pool2Size := pool.all.Slots()
canTransferPool3ToPool2 := maxPool1Pool2CombinedSize > currentPool1Pool2Size
if !canTransferPool3ToPool2 {
currentPool1Size := pool.all.Slots()
canTransferPool3ToPool1 := maxPool1Size > currentPool1Size
if !canTransferPool3ToPool1 {
return
}
extraSlots := maxPool1Pool2CombinedSize - currentPool1Pool2Size
extraSlots := maxPool1Size - currentPool1Size
extraTransactions := (extraSlots + 3) / 4 // Since maximum slots per transaction is 4
// So now we can take out extraTransactions number of transactions from pool3 and put in pool2
// So now we can take out extraTransactions number of transactions from pool3 and put in pool1
if extraTransactions < 1 {
return
}
log.Debug("Will attempt to transfer from pool3 to pool2", "transactions", extraTransactions)
log.Debug("Will attempt to transfer from pool3 to pool1", "transactions", extraTransactions)
tx := pool.localBufferPool.Flush(extraTransactions)
if len(tx) == 0 {
return
}
fmt.Println("transferring tranasction")
pool.Add(tx, true, false)
}

@ -1741,7 +1741,6 @@ func TestRepricingKeepsLocals(t *testing.T) {
func TestUnderpricing(t *testing.T) {
t.Parallel()
testTxPoolConfig.Pool3Slots = 5
testTxPoolConfig.Pool2Slots = 0
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
@ -1932,9 +1931,8 @@ func TestUnderpricingDynamicFee(t *testing.T) {
defer pool.Close()
pool.config.GlobalSlots = 2
pool.config.GlobalQueue = 1
pool.config.GlobalQueue = 2
pool.config.Pool2Slots = 1
pool.config.Pool3Slots = 0
// Keep track of transaction events to ensure all executables get announced
@ -1962,7 +1960,7 @@ func TestUnderpricingDynamicFee(t *testing.T) {
// Import the batch and that both pending and queued transactions match up
pool.addRemotes(txs) // Pend K0:0, K0:1; Que K1:1
pool.addLocal(ltx) // +K2:0 => Pend K0:0, K0:1, K2:0; Que K1:1
fmt.Println("pool.addLocal(ltx) done")
pending, queued := pool.Stats()
if pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
@ -1970,9 +1968,11 @@ func TestUnderpricingDynamicFee(t *testing.T) {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
fmt.Println("before validateEvents")
if err := validateEvents(events, 3); err != nil {
t.Fatalf("original event firing failed: %v", err)
}
fmt.Println("after validateEvents")
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
@ -2008,7 +2008,7 @@ func TestUnderpricingDynamicFee(t *testing.T) {
}
fmt.Println("Stats before validateEvents")
pool.printTxStats()
if err := validateEvents(events, 4); err != nil { // todo make it 4...After this validateEvents the pending becomes 3?!
if err := validateEvents(events, 2); err != nil { // todo make it 4...After this validateEvents the pending becomes 3?!
t.Fatalf("additional event firing failed: %v", err)
}
fmt.Println("Stats after validateEvents")
@ -2027,10 +2027,10 @@ func TestUnderpricingDynamicFee(t *testing.T) {
}
pending, queued = pool.Stats()
if pending != 5 { // 3
if pending != 3 { // 3
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
if queued != 0 { // 1
if queued != 1 { // 1
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
if err := validateEvents(events, 2); err != nil {
@ -2046,14 +2046,13 @@ func TestUnderpricingDynamicFee(t *testing.T) {
func TestDualHeapEviction(t *testing.T) {
t.Parallel()
testTxPoolConfig.Pool3Slots = 5
testTxPoolConfig.Pool3Slots = 1
pool, _ := setupPoolWithConfig(eip1559Config)
defer pool.Close()
pool.config.GlobalSlots = 2
pool.config.GlobalQueue = 2
pool.config.Pool2Slots = 1
pool.config.Pool3Slots = 5
pool.config.Pool3Slots = 1
var (
highTip, highCap *types.Transaction
@ -2071,7 +2070,7 @@ func TestDualHeapEviction(t *testing.T) {
}
add := func(urgent bool) {
for i := 0; i < 5; i++ {
for i := 0; i < 4; i++ {
var tx *types.Transaction
// Create a test accounts and fund it
key, _ := crypto.GenerateKey()
@ -2098,7 +2097,7 @@ func TestDualHeapEviction(t *testing.T) {
}
}
pending, queued := pool.Stats()
if pending+queued != 5 {
if pending+queued != 4 {
t.Fatalf("transaction count mismatch: have %d, want %d, pending %d, queued %d, pool3 %d", pending+queued, 5, pending, queued, pool.localBufferPool.Size())
}
}
@ -2271,89 +2270,43 @@ func TestTransferTransactions(t *testing.T) {
defer pool.Close()
pool.config.GlobalSlots = 1
pool.config.GlobalQueue = 1
pool.config.Pool2Slots = 1
pool.config.GlobalQueue = 2
pool.config.Pool3Slots = 1
// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
fmt.Println(crypto.PubkeyToAddress(keys[i].PublicKey))
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
}
tx := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[0])
from, _ := types.Sender(pool.signer, tx)
pool.addToPool12OrPool3(tx, from, true, false, false, true)
//time.Sleep(6 * time.Second)
pool.addToPool3([]*types.Transaction{tx}, true)
pending, queue := pool.Stats()
p3Size := pool.StatsPool3()
if pending != 0 {
t.Errorf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queue != 0 {
t.Errorf("queued transactions mismatched: have %d, want %d", queue, 0)
}
assert.Equal(t, 1, p3Size, "pool3 size unexpected")
assert.Equal(t, 0, pending, "pending transactions mismatched")
assert.Equal(t, 0, queue, "queued transactions mismatched")
assert.Equal(t, 1, pool.StatsPool3(), "pool3 size unexpected")
tx2 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[1])
from2, _ := types.Sender(pool.signer, tx2)
pool.addToPool12OrPool3(tx2, from2, true, false, false, true)
pool.addToPool3([]*types.Transaction{tx2}, true)
assert.Equal(t, 1, pool.StatsPool3(), "pool3 size unexpected")
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
//time.Sleep(6 * time.Second)
pending, queue = pool.Stats()
if pending != 0 {
t.Errorf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queue != 1 {
t.Errorf("queued transactions mismatched: have %d, want %d", queue, 1)
}
assert.Equal(t, 0, pending, "pending transactions mismatched")
assert.Equal(t, 1, queue, "queued transactions mismatched")
assert.Equal(t, 0, pool.StatsPool3(), "pool3 size unexpected")
tx3 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[2])
from3, _ := types.Sender(pool.signer, tx3)
pool.addToPool12OrPool3(tx3, from3, true, false, false, true)
//time.Sleep(6 * time.Second)
pool.addToPool3([]*types.Transaction{tx3}, true)
pending, queue = pool.Stats()
if pending != 0 {
t.Errorf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queue != 1 {
t.Errorf("queued transactions mismatched: have %d, want %d", queue, 1)
}
tx4 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[3])
from4, _ := types.Sender(pool.signer, tx4)
pool.addToPool12OrPool3(tx4, from4, true, false, false, true)
//time.Sleep(6 * time.Second)
pending, queue = pool.Stats()
if pending != 0 {
t.Errorf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queue != 1 {
t.Errorf("queued transactions mismatched: have %d, want %d", queue, 1)
}
bufferSize := pool.localBufferPool.Size()
if bufferSize != 2 {
t.Errorf("buffer transactions mismatched: have %d, want %d", bufferSize, 1)
}
tx5 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[4])
from5, _ := types.Sender(pool.signer, tx5)
pool.addToPool12OrPool3(tx5, from5, true, false, false, true)
//time.Sleep(6 * time.Second)
pending, queue = pool.Stats()
if pending != 0 {
t.Errorf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queue != 1 {
t.Errorf("queued transactions mismatched: have %d, want %d", queue, 1)
}
bufferSize = pool.localBufferPool.Size()
if bufferSize != 3 {
t.Errorf("buffer transactions mismatched: have %d, want %d", bufferSize, 1)
}
assert.Equal(t, 1, pending, "pending transactions mismatched")
assert.Equal(t, 0, queue, "queued transactions mismatched")
assert.Equal(t, 1, pool.StatsPool3(), "pool3 size unexpected")
}
// Tests that the pool rejects replacement dynamic fee transactions that don't

@ -851,6 +851,8 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, staticOnly bool)
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
)
// for pool2 it is p = static+sqrt(peers) is total available.
// Send tx to sqrt(p) and announce to (p - sqrt(p)) peers
// Broadcast transactions to a batch of peers not knowing about it
for _, tx := range txs {
peers := h.peers.peersWithoutTransaction(tx.Hash())
@ -865,7 +867,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, staticOnly bool)
numDirect = int(math.Sqrt(float64(len(peers))))
}
if staticOnly {
numDirect = int(math.Cbrt(float64(len(peers))))
numDirect = int(math.Sqrt(math.Sqrt(float64(len(peers)))))
}
// Send the tx unconditionally to a subset of our peers
for _, peer := range peers[:numDirect] {

@ -235,6 +235,9 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash, staticOnly bool) {
case p.txBroadcast <- hashes:
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
if staticOnly && p.Peer.Info().Network.Static {
p.Log().Debug("Sent pool-2 transaction", "count", len(hashes))
}
default:
// Handle the case when the channel is full or not ready
p.Log().Debug("Unable to send transactions, channel full or not ready", "count", len(hashes))