pool: remove pool2 related code

This commit is contained in:
emailtovamos 2024-09-26 14:13:13 +01:00
parent 846e55b9a4
commit 6a6e09c849
8 changed files with 58 additions and 135 deletions

@ -22,13 +22,7 @@ import (
)
// NewTxsEvent is posted when a batch of transactions enters the transaction pool.
type NewTxsEvent struct {
Txs []*types.Transaction
// Static bool is Whether to send to only Static peer or not.
// This is because at high traffic we still want to broadcast transactions to at least some peers so that we
// minimize the transaction lost.
Static bool
}
type NewTxsEvent struct{ Txs []*types.Transaction }
// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }

@ -30,10 +30,6 @@ var (
// configured for the transaction pool.
ErrUnderpriced = errors.New("transaction underpriced")
// ErrUnderpricedTransferredtoAnotherPool is returned if a transaction's gas price is below the minimum
// configured for the transaction pool.
ErrUnderpricedTransferredtoAnotherPool = errors.New("transaction underpriced, so it is either in pool2 or pool3")
// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")

@ -244,7 +244,7 @@ type LegacyPool struct {
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan QueueTxEventCh
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
@ -280,7 +280,7 @@ func New(config Config, chain BlockChain) *LegacyPool {
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan QueueTxEventCh),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
@ -405,7 +405,7 @@ func (pool *LegacyPool) loop() {
}
// Any non-locals old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list, _ := pool.queue[addr].Flatten()
list := pool.queue[addr].Flatten()
for _, tx := range list {
pool.removeTx(tx.Hash(), true, true)
}
@ -416,27 +416,25 @@ func (pool *LegacyPool) loop() {
case <-reannounce.C:
pool.mu.RLock()
reannoTxs, _ := func() ([]*types.Transaction, []bool) {
reannoTxs := func() []*types.Transaction {
txs := make([]*types.Transaction, 0)
statics := make([]bool, 0)
for addr, list := range pool.pending {
if !pool.locals.contains(addr) {
continue
}
transactions, static := list.Flatten()
transactions := list.Flatten()
for _, tx := range transactions {
// Default ReannounceTime is 10 years, won't announce by default.
if time.Since(tx.Time()) < pool.config.ReannounceTime {
break
}
txs = append(txs, tx)
statics = append(statics, static)
if len(txs) >= txReannoMaxNum {
return txs, statics
return txs
}
}
}
return txs, statics
return txs
}()
pool.mu.RUnlock()
if len(reannoTxs) > 0 {
@ -567,11 +565,11 @@ func (pool *LegacyPool) Content() (map[common.Address][]*types.Transaction, map[
pending := make(map[common.Address][]*types.Transaction, len(pool.pending))
for addr, list := range pool.pending {
pending[addr], _ = list.Flatten()
pending[addr] = list.Flatten()
}
queued := make(map[common.Address][]*types.Transaction, len(pool.queue))
for addr, list := range pool.queue {
queued[addr], _ = list.Flatten()
queued[addr] = list.Flatten()
}
return pending, queued
}
@ -584,11 +582,11 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
var pending []*types.Transaction
if list, ok := pool.pending[addr]; ok {
pending, _ = list.Flatten()
pending = list.Flatten()
}
var queued []*types.Transaction
if list, ok := pool.queue[addr]; ok {
queued, _ = list.Flatten()
queued = list.Flatten()
}
return pending, queued
}
@ -620,7 +618,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs, static := list.Flatten()
txs := list.Flatten()
// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
@ -643,7 +641,6 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
Static: static,
}
}
pending[addr] = lazies
@ -667,11 +664,11 @@ func (pool *LegacyPool) local() map[common.Address]types.Transactions {
txs := make(map[common.Address]types.Transactions)
for addr := range pool.locals.accounts {
if pending := pool.pending[addr]; pending != nil {
transactions, _ := pending.Flatten()
transactions := pending.Flatten()
txs[addr] = append(txs[addr], transactions...)
}
if queued := pool.queue[addr]; queued != nil {
transactions, _ := queued.Flatten()
transactions := queued.Flatten()
txs[addr] = append(txs[addr], transactions...)
}
}
@ -1315,12 +1312,8 @@ func (pool *LegacyPool) requestPromoteExecutables(set *accountSet) chan struct{}
// queueTxEvent enqueues a transaction event to be sent in the next reorg run.
func (pool *LegacyPool) queueTxEvent(tx *types.Transaction, static bool) {
event := QueueTxEventCh{
tx: tx,
static: static,
}
select {
case pool.queueTxEventCh <- event:
case pool.queueTxEventCh <- tx:
case <-pool.reorgShutdownCh:
}
}
@ -1374,14 +1367,14 @@ func (pool *LegacyPool) scheduleReorgLoop() {
launchNextRun = true
pool.reorgDoneCh <- nextDone
case queue := <-pool.queueTxEventCh:
case tx := <-pool.queueTxEventCh:
// Queue up the event, but don't schedule a reorg. It's up to the caller to
// request one later if they want the events sent.
addr, _ := types.Sender(pool.signer, queue.tx)
addr, _ := types.Sender(pool.signer, tx)
if _, ok := queuedEvents[addr]; !ok {
queuedEvents[addr] = newSortedMap()
}
queuedEvents[addr].Put(queue.tx, queue.static)
queuedEvents[addr].Put(tx)
case <-curDone:
curDone = nil
@ -1449,7 +1442,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
highestPending, _ := list.LastElement()
highestPending := list.LastElement()
nonces[addr] = highestPending.Nonce() + 1
}
pool.pendingNonces.setAll(nonces)
@ -1471,31 +1464,14 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
if _, ok := events[addr]; !ok {
events[addr] = newSortedMap()
}
events[addr].Put(tx, false) // todo putting false as placeholder for now
events[addr].Put(tx)
}
if len(events) > 0 {
staticTxs := make([]*types.Transaction, 0)
nonStaticTxs := make([]*types.Transaction, 0)
var txs []*types.Transaction
for _, set := range events {
flattenedTxs, _ := set.Flatten()
if set.staticOnly {
staticTxs = append(staticTxs, flattenedTxs...)
} else {
nonStaticTxs = append(nonStaticTxs, flattenedTxs...)
}
}
// Send static transactions
if len(staticTxs) > 0 {
fmt.Println("New txevent emitted for static ", staticTxs[0].Hash())
pool.txFeed.Send(core.NewTxsEvent{Txs: staticTxs, Static: true})
}
// Send dynamic transactions
if len(nonStaticTxs) > 0 {
from, _ := types.Sender(pool.signer, nonStaticTxs[0])
fmt.Println("New txevent emitted for non static ", nonStaticTxs[0].Hash(), len(nonStaticTxs), from.String())
pool.txFeed.Send(core.NewTxsEvent{Txs: nonStaticTxs, Static: false})
txs = append(txs, set.Flatten()...)
}
pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
}
}
@ -1787,7 +1763,7 @@ func (pool *LegacyPool) truncateQueue() {
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
transactions, _ := list.Flatten()
transactions := list.Flatten()
for _, tx := range transactions {
pool.removeTx(tx.Hash(), true, true)
}
@ -1796,7 +1772,7 @@ func (pool *LegacyPool) truncateQueue() {
continue
}
// Otherwise drop only last few transactions
txs, _ := list.Flatten()
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true, true)
drop--

@ -63,11 +63,10 @@ func (h *nonceHeap) Pop() interface{} {
// sortedMap is a nonce->transaction hash map with a heap based index to allow
// iterating over the contents in a nonce-incrementing way.
type sortedMap struct {
items map[uint64]*types.Transaction // Hash map storing the transaction data
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
cache types.Transactions // Cache of the transactions already sorted
cacheMu sync.Mutex // Mutex covering the cache
staticOnly bool // only send this transaction to static peers
items map[uint64]*types.Transaction // Hash map storing the transaction data
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
cache types.Transactions // Cache of the transactions already sorted
cacheMu sync.Mutex // Mutex covering the cache
}
// newSortedMap creates a new nonce-sorted transaction map.
@ -85,7 +84,7 @@ func (m *sortedMap) Get(nonce uint64) *types.Transaction {
// Put inserts a new transaction into the map, also updating the map's nonce
// index. If a transaction already exists with the same nonce, it's overwritten.
func (m *sortedMap) Put(tx *types.Transaction, static bool) {
func (m *sortedMap) Put(tx *types.Transaction) {
nonce := tx.Nonce()
if m.items[nonce] == nil {
heap.Push(m.index, nonce)
@ -95,7 +94,6 @@ func (m *sortedMap) Put(tx *types.Transaction, static bool) {
txSortedMapPool.Put(m.cache)
}
m.items[nonce], m.cache = tx, nil
m.staticOnly = static
m.cacheMu.Unlock()
}
@ -255,7 +253,7 @@ func (m *sortedMap) Len() int {
return len(m.items)
}
func (m *sortedMap) flatten() (types.Transactions, bool) {
func (m *sortedMap) flatten() types.Transactions {
m.cacheMu.Lock()
defer m.cacheMu.Unlock()
// If the sorting was not cached yet, create and cache it
@ -272,25 +270,25 @@ func (m *sortedMap) flatten() (types.Transactions, bool) {
}
sort.Sort(types.TxByNonce(m.cache))
}
return m.cache, m.staticOnly
return m.cache
}
// Flatten creates a nonce-sorted slice of transactions based on the loosely
// sorted internal representation. The result of the sorting is cached in case
// it's requested again before any modifications are made to the contents.
func (m *sortedMap) Flatten() (types.Transactions, bool) {
cache, static := m.flatten()
func (m *sortedMap) Flatten() types.Transactions {
cache := m.flatten()
// Copy the cache to prevent accidental modification
txs := make(types.Transactions, len(cache))
copy(txs, cache)
return txs, static
return txs
}
// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (m *sortedMap) LastElement() (*types.Transaction, bool) {
cache, static := m.flatten()
return cache[len(cache)-1], static
func (m *sortedMap) LastElement() *types.Transaction {
cache := m.flatten()
return cache[len(cache)-1]
}
// list is a "list" of transactions belonging to an account, sorted by account
@ -362,7 +360,7 @@ func (l *list) Add(tx *types.Transaction, priceBump uint64, static bool) (bool,
l.totalcost.Add(l.totalcost, cost)
// Otherwise overwrite the old transaction with the current one
l.txs.Put(tx, static)
l.txs.Put(tx)
if l.costcap.Cmp(cost) < 0 {
l.costcap = cost
}
@ -477,13 +475,13 @@ func (l *list) Empty() bool {
// Flatten creates a nonce-sorted slice of transactions based on the loosely
// sorted internal representation. The result of the sorting is cached in case
// it's requested again before any modifications are made to the contents.
func (l *list) Flatten() (types.Transactions, bool) {
func (l *list) Flatten() types.Transactions {
return l.txs.Flatten()
}
// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (l *list) LastElement() (*types.Transaction, bool) {
func (l *list) LastElement() *types.Transaction {
return l.txs.LastElement()
}

@ -41,8 +41,6 @@ type LazyTransaction struct {
Gas uint64 // Amount of gas required by the transaction
BlobGas uint64 // Amount of blob gas required by the transaction
Static bool // To specify whether to broadcast it to static peers or not
}
// Resolve retrieves the full transaction belonging to a lazy handle if it is still

@ -838,7 +838,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
// - To a square root of all peers for non-blob transactions
// - And, separately, as announcements to all peers which are not known to
// already have the given transaction.
func (h *handler) BroadcastTransactions(txs types.Transactions, staticOnly bool) {
func (h *handler) BroadcastTransactions(txs types.Transactions) {
var (
blobTxs int // Number of blob transactions to announce only
largeTxs int // Number of large transactions to announce only
@ -866,9 +866,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, staticOnly bool)
default:
numDirect = int(math.Sqrt(float64(len(peers))))
}
if staticOnly {
numDirect = int(math.Sqrt(math.Sqrt(float64(len(peers)))))
}
// Send the tx unconditionally to a subset of our peers
for _, peer := range peers[:numDirect] {
txset[peer] = append(txset[peer], tx.Hash())
@ -881,12 +879,12 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, staticOnly bool)
for peer, hashes := range txset {
directPeers++
directCount += len(hashes)
peer.AsyncSendTransactions(hashes, staticOnly)
peer.AsyncSendTransactions(hashes)
}
for peer, hashes := range annos {
annPeers++
annCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes, staticOnly)
peer.AsyncSendPooledTransactionHashes(hashes)
}
log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs,
"bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annPeers, "anncount", annCount)
@ -904,7 +902,7 @@ func (h *handler) ReannounceTransactions(txs types.Transactions) {
peersCount := uint(math.Sqrt(float64(h.peers.len())))
peers := h.peers.headPeers(peersCount)
for _, peer := range peers {
peer.AsyncSendPooledTransactionHashes(hashes, false) // todo keeping it false for now. Reannounce never really happens
peer.AsyncSendPooledTransactionHashes(hashes)
}
log.Debug("Transaction reannounce", "txs", len(txs),
"announce packs", peersCount, "announced hashes", peersCount*uint(len(hashes)))
@ -967,7 +965,7 @@ func (h *handler) txBroadcastLoop() {
for {
select {
case event := <-h.txsCh:
h.BroadcastTransactions(event.Txs, event.Static)
h.BroadcastTransactions(event.Txs)
case <-h.txsSub.Err():
return
case <-h.stopCh:

@ -222,29 +222,12 @@ func (p *Peer) SendTransactions(txs types.Transactions) error {
// AsyncSendTransactions queues a list of transactions (by hash) to eventually
// propagate to a remote peer. The number of pending sends are capped (new ones
// will force old sends to be dropped)
func (p *Peer) AsyncSendTransactions(hashes []common.Hash, staticOnly bool) {
// p.Peer.Info().Network.Static bool decides if pool2 transaction will be broadcasted to that peer or not
func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
select {
case <-p.txTerm:
p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
case <-p.term:
p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
default:
if (staticOnly && p.Peer.Info().Network.Static) || !staticOnly {
select {
case p.txBroadcast <- hashes:
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
if staticOnly && p.Peer.Info().Network.Static {
p.Log().Debug("Sent pool-2 transaction", "count", len(hashes))
}
default:
// Handle the case when the channel is full or not ready
p.Log().Debug("Unable to send transactions, channel full or not ready", "count", len(hashes))
}
} else {
p.Log().Debug("Not sending transactions as peer not static", "count", len(hashes))
}
}
}
@ -264,25 +247,15 @@ func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash, types []byte, s
// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
// announce to a remote peer. The number of pending sends are capped (new ones
// will force old sends to be dropped)
func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash, staticOnly bool) {
func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
select {
case p.txAnnounce <- hashes:
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
case <-p.txTerm:
p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
case <-p.term:
p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
default:
if (staticOnly && p.Peer.Info().Network.Static) || !staticOnly {
select {
case p.txAnnounce <- hashes:
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
default:
// Handle the case when the channel is full or not ready
p.Log().Debug("Unable to send transactions, channel full or not ready", "count", len(hashes))
}
} else {
p.Log().Debug("Not sending transactions as peer not static", "count", len(hashes))
}
}
}

@ -35,26 +35,16 @@ const (
// syncTransactions starts sending all currently pending transactions to the given peer.
func (h *handler) syncTransactions(p *eth.Peer) {
var hashesFalse []common.Hash
var hashesTrue []common.Hash
var hashes []common.Hash
for _, batch := range h.txpool.Pending(txpool.PendingFilter{OnlyPlainTxs: true}) {
for _, tx := range batch {
if tx.Static {
hashesTrue = append(hashesTrue, tx.Hash)
} else {
hashesFalse = append(hashesFalse, tx.Hash)
}
hashes = append(hashes, tx.Hash)
}
}
if len(hashesFalse) > 0 {
p.AsyncSendPooledTransactionHashes(hashesFalse, false)
}
if len(hashesTrue) > 0 {
p.AsyncSendPooledTransactionHashes(hashesTrue, true)
if len(hashes) == 0 {
return
}
p.AsyncSendPooledTransactionHashes(hashes)
}
// syncVotes starts sending all currently pending votes to the given peer.