core: move TxPool reorg and events to background goroutine (#19705)

* core: move TxPool reorg and events to background goroutine

This change moves internal queue re-shuffling work in TxPool to a
background goroutine, TxPool.runReorg. Requests to execute runReorg are
accumulated by the new scheduleReorgLoop. The new loop also accumulates
transaction events.

The motivation for this change is making sends to txFeed synchronous
instead of sending them in one-off goroutines launched by 'add' and
'promoteExecutables'. If a downstream consumer of txFeed is blocked for
a while, reorg requests and events will queue up.

* core: remove homestead check in TxPool

This change removes tracking of the homestead block number from TxPool.
The homestead field was used to enforce minimum gas of 53000 for
contract creations after the homestead fork, but not before it. Since
nobody would want configure a non-homestead chain nowadays and contract
creations usually take more than 53000 gas, the extra correctness is
redundant and can be removed.

* core: fixes for review comments

* core: remove BenchmarkPoolInsert

This is useless now because there is no separate code path for
individual transactions anymore.

* core: fix pending counter metric

* core: fix pool tests

* core: dedup txpool announced events, discard stales

* core: reorg tx promotion/demotion to avoid weird pending gaps
This commit is contained in:
Felix Lange 2019-06-21 10:29:14 +02:00 committed by Péter Szilágyi
parent 25c3282cf1
commit 60c062e17d
2 changed files with 546 additions and 456 deletions

@ -214,8 +214,6 @@ type TxPool struct {
gasPrice *big.Int gasPrice *big.Int
txFeed event.Feed txFeed event.Feed
scope event.SubscriptionScope scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer signer types.Signer
mu sync.RWMutex mu sync.RWMutex
@ -232,9 +230,18 @@ type TxPool struct {
all *txLookup // All transactions to allow lookups all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price priced *txPricedList // All transactions sorted by price
wg sync.WaitGroup // for shutdown sync chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
}
homestead bool type txpoolResetRequest struct {
oldHead, newHead *types.Header
} }
// NewTxPool creates a new transaction pool to gather, sort and filter inbound // NewTxPool creates a new transaction pool to gather, sort and filter inbound
@ -254,6 +261,11 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
beats: make(map[common.Address]time.Time), beats: make(map[common.Address]time.Time),
all: newTxLookup(), all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit), gasPrice: new(big.Int).SetUint64(config.PriceLimit),
} }
pool.locals = newAccountSet(pool.signer) pool.locals = newAccountSet(pool.signer)
@ -264,6 +276,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pool.priced = newTxPricedList(pool.all) pool.priced = newTxPricedList(pool.all)
pool.reset(nil, chain.CurrentBlock().Header()) pool.reset(nil, chain.CurrentBlock().Header())
// Start the reorg loop early so it can handle requests generated during journal loading.
pool.wg.Add(1)
go pool.scheduleReorgLoop()
// If local transactions and journaling is enabled, load from disk // If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" { if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal) pool.journal = newTxJournal(config.Journal)
@ -275,10 +291,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
log.Warn("Failed to rotate transaction journal", "err", err) log.Warn("Failed to rotate transaction journal", "err", err)
} }
} }
// Subscribe events from blockchain
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
// Start the event loop and return // Subscribe events from blockchain and start the main event loop.
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.wg.Add(1) pool.wg.Add(1)
go pool.loop() go pool.loop()
@ -291,38 +306,31 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
func (pool *TxPool) loop() { func (pool *TxPool) loop() {
defer pool.wg.Done() defer pool.wg.Done()
var (
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers // Start the stats reporting and transaction eviction tickers
var prevPending, prevQueued, prevStales int report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
report := time.NewTicker(statsReportInterval) journal = time.NewTicker(pool.config.Rejournal)
// Track the previous head headers for transaction reorgs
head = pool.chain.CurrentBlock()
)
defer report.Stop() defer report.Stop()
evict := time.NewTicker(evictionInterval)
defer evict.Stop() defer evict.Stop()
journal := time.NewTicker(pool.config.Rejournal)
defer journal.Stop() defer journal.Stop()
// Track the previous head headers for transaction reorgs
head := pool.chain.CurrentBlock()
// Keep waiting for and reacting to the various events
for { for {
select { select {
// Handle ChainHeadEvent // Handle ChainHeadEvent
case ev := <-pool.chainHeadCh: case ev := <-pool.chainHeadCh:
if ev.Block != nil { if ev.Block != nil {
pool.mu.Lock() pool.requestReset(head.Header(), ev.Block.Header())
if pool.chainconfig.IsHomestead(ev.Block.Number()) {
pool.homestead = true
}
pool.reset(head.Header(), ev.Block.Header())
head = ev.Block head = ev.Block
pool.mu.Unlock()
} }
// Be unsubscribed due to system stopped
// System shutdown.
case <-pool.chainHeadSub.Err(): case <-pool.chainHeadSub.Err():
close(pool.reorgShutdownCh)
return return
// Handle stats reporting ticks // Handle stats reporting ticks
@ -367,114 +375,6 @@ func (pool *TxPool) loop() {
} }
} }
// lockedReset is a wrapper around reset to allow calling it in a thread safe
// manner. This method is only ever used in the tester!
func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool.reset(oldHead, newHead)
}
// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *TxPool) reset(oldHead, newHead *types.Header) {
// If we're reorging an old state, reinject all dropped transactions
var reinject types.Transactions
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug("Skipping deep transaction reorg", "depth", depth)
} else {
// Reorg seems shallow enough to pull in all transactions into memory
var discarded, included types.Transactions
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
)
if rem == nil {
// This can happen if a setHead is performed, where we simply discard the old
// head from the chain.
// If that is the case, we don't have the lost transactions any more, and
// there's nothing to add
if newNum < oldNum {
// If the reorg ended up on a lower number, it's indicative of setHead being the cause
log.Debug("Skipping transaction reset caused by setHead",
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
} else {
// If we reorged to a same or higher number, then it's not a case of setHead
log.Warn("Transaction pool reset with missing oldhead",
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
}
return
}
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
}
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
for rem.Hash() != add.Hash() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
reinject = types.TxDifference(discarded, included)
}
}
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock().Header() // Special case during testing
}
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error("Failed to reset txpool state", "err", err)
return
}
pool.currentState = statedb
pool.pendingState = state.ManageState(statedb)
pool.currentMaxGas = newHead.GasLimit
// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
// validate the pool of pending transactions, this will remove
// any transactions that have been included in the block or
// have been invalidated because of another transaction (e.g.
// higher gas price)
pool.demoteUnexecutables()
// Update all accounts to the latest known pending nonce
for addr, list := range pool.pending {
txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
}
// Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid
pool.promoteExecutables(nil)
}
// Stop terminates the transaction pool. // Stop terminates the transaction pool.
func (pool *TxPool) Stop() { func (pool *TxPool) Stop() {
// Unsubscribe all subscriptions registered from txpool // Unsubscribe all subscriptions registered from txpool
@ -638,7 +538,8 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds return ErrInsufficientFunds
} }
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) // Ensure the transaction has more gas than the basic tx fee.
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, true)
if err != nil { if err != nil {
return err return err
} }
@ -648,27 +549,28 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return nil return nil
} }
// add validates a transaction and inserts it into the non-executable queue for // add validates a transaction and inserts it into the non-executable queue for later
// later pending promotion and execution. If the transaction is a replacement for // pending promotion and execution. If the transaction is a replacement for an already
// an already pending or queued one, it overwrites the previous and returns this // pending or queued one, it overwrites the previous transaction if its price is higher.
// so outer code doesn't uselessly call promote.
// //
// If a newly added transaction is marked as local, its sending account will be // If a newly added transaction is marked as local, its sending account will be
// whitelisted, preventing any associated transaction from being dropped out of // whitelisted, preventing any associated transaction from being dropped out of the pool
// the pool due to pricing constraints. // due to pricing constraints.
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
// If the transaction is already known, discard it // If the transaction is already known, discard it
hash := tx.Hash() hash := tx.Hash()
if pool.all.Get(hash) != nil { if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash) log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash) return false, fmt.Errorf("known transaction: %x", hash)
} }
// If the transaction fails basic validation, discard it // If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil { if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err) log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxMeter.Mark(1) invalidTxMeter.Mark(1)
return false, err return false, err
} }
// If the transaction pool is full, discard underpriced transactions // If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it // If the new transaction is underpriced, don't accept it
@ -685,7 +587,8 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.removeTx(tx.Hash(), false) pool.removeTx(tx.Hash(), false)
} }
} }
// If the transaction is replacing an already pending one, do directly
// Try to replace an existing transaction in the pending pool
from, _ := types.Sender(pool.signer, tx) // already validated from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) { if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met // Nonce already pending, check if required price bump is met
@ -703,19 +606,17 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.all.Add(tx) pool.all.Add(tx)
pool.priced.Put(tx) pool.priced.Put(tx)
pool.journalTx(from, tx) pool.journalTx(from, tx)
pool.queueTxEvent(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
// We've directly injected a replacement transaction, notify subsystems
go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
return old != nil, nil return old != nil, nil
} }
// New transaction isn't replacing a pending one, push into queue // New transaction isn't replacing a pending one, push into queue
replace, err := pool.enqueueTx(hash, tx) replaced, err = pool.enqueueTx(hash, tx)
if err != nil { if err != nil {
return false, err return false, err
} }
// Mark local addresses and journal local transactions // Mark local addresses and journal local transactions
if local { if local {
if !pool.locals.contains(from) { if !pool.locals.contains(from) {
@ -729,7 +630,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.journalTx(from, tx) pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replace, nil return replaced, nil
} }
// enqueueTx inserts a new transaction into the non-executable transaction queue. // enqueueTx inserts a new transaction into the non-executable transaction queue.
@ -817,96 +718,85 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
return true return true
} }
// AddLocal enqueues a single transaction into the pool if it is valid, marking // AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
// the sender as a local one in the mean time, ensuring it goes around the local // senders as a local ones, ensuring they go around the local pricing constraints.
// pricing constraints. //
func (pool *TxPool) AddLocal(tx *types.Transaction) error { // This method is used to add transactions from the RPC API and performs synchronous pool
return pool.addTx(tx, !pool.config.NoLocals) // reorganization and event propagation.
}
// AddRemote enqueues a single transaction into the pool if it is valid. If the
// sender is not among the locally tracked ones, full pricing constraints will
// apply.
func (pool *TxPool) AddRemote(tx *types.Transaction) error {
return pool.addTx(tx, false)
}
// AddLocals enqueues a batch of transactions into the pool if they are valid,
// marking the senders as a local ones in the mean time, ensuring they go around
// the local pricing constraints.
func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
return pool.addTxs(txs, !pool.config.NoLocals) return pool.addTxs(txs, !pool.config.NoLocals, true)
} }
// AddRemotes enqueues a batch of transactions into the pool if they are valid. // AddLocal enqueues a single local transaction into the pool if it is valid. This is
// If the senders are not among the locally tracked ones, full pricing constraints // a convenience wrapper aroundd AddLocals.
// will apply. func (pool *TxPool) AddLocal(tx *types.Transaction) error {
errs := pool.AddLocals([]*types.Transaction{tx})
return errs[0]
}
// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the
// senders are not among the locally tracked ones, full pricing constraints will apply.
//
// This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
return pool.addTxs(txs, false) return pool.addTxs(txs, false, false)
} }
// addTx enqueues a single transaction into the pool if it is valid. // This is like AddRemotes, but waits for pool reorganization. Tests use this method.
func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { func (pool *TxPool) addRemotesSync(txs []*types.Transaction) []error {
// Cache sender in transaction before obtaining lock (pool.signer is immutable) return pool.addTxs(txs, false, true)
types.Sender(pool.signer, tx) }
pool.mu.Lock() // This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
defer pool.mu.Unlock() func (pool *TxPool) addRemoteSync(tx *types.Transaction) error {
errs := pool.addRemotesSync([]*types.Transaction{tx})
return errs[0]
}
// Try to inject the transaction and update any state // AddRemote enqueues a single transaction into the pool if it is valid. This is a convenience
replace, err := pool.add(tx, local) // wrapper around AddRemotes.
if err != nil { //
return err // Deprecated: use AddRemotes
} func (pool *TxPool) AddRemote(tx *types.Transaction) error {
validMeter.Mark(1) errs := pool.AddRemotes([]*types.Transaction{tx})
return errs[0]
// If we added a new transaction, run promotion checks and return
if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
pool.promoteExecutables([]common.Address{from})
}
return nil
} }
// addTxs attempts to queue a batch of transactions if they are valid. // addTxs attempts to queue a batch of transactions if they are valid.
func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error { func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Cache senders in transactions before obtaining lock (pool.signer is immutable) // Cache senders in transactions before obtaining lock (pool.signer is immutable)
for _, tx := range txs { for _, tx := range txs {
types.Sender(pool.signer, tx) types.Sender(pool.signer, tx)
} }
pool.mu.Lock() pool.mu.Lock()
defer pool.mu.Unlock() errs, dirtyAddrs := pool.addTxsLocked(txs, local)
pool.mu.Unlock()
return pool.addTxsLocked(txs, local) done := pool.requestPromoteExecutables(dirtyAddrs)
} if sync {
<-done
// addTxsLocked attempts to queue a batch of transactions if they are valid,
// whilst assuming the transaction pool lock is already held.
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
// Add the batch of transactions, tracking the accepted ones
dirty := make(map[common.Address]struct{})
errs := make([]error, len(txs))
for i, tx := range txs {
var replace bool
if replace, errs[i] = pool.add(tx, local); errs[i] == nil && !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
dirty[from] = struct{}{}
}
}
validMeter.Mark(int64(len(dirty)))
// Only reprocess the internal state if something was actually added
if len(dirty) > 0 {
addrs := make([]common.Address, 0, len(dirty))
for addr := range dirty {
addrs = append(addrs, addr)
}
pool.promoteExecutables(addrs)
} }
return errs return errs
} }
// addTxsLocked attempts to queue a batch of transactions if they are valid.
// The transaction pool lock must be held.
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
dirty := newAccountSet(pool.signer)
errs := make([]error, len(txs))
for i, tx := range txs {
replaced, err := pool.add(tx, local)
errs[i] = err
if err == nil && !replaced {
dirty.addTx(tx)
}
}
validMeter.Mark(int64(len(dirty.accounts)))
return errs, dirty
}
// Status returns the status (unknown/pending/queued) of a batch of transactions // Status returns the status (unknown/pending/queued) of a batch of transactions
// identified by their hashes. // identified by their hashes.
func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
@ -927,8 +817,7 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
return status return status
} }
// Get returns a transaction if it is contained in the pool // Get returns a transaction if it is contained in the pool and nil otherwise.
// and nil otherwise.
func (pool *TxPool) Get(hash common.Hash) *types.Transaction { func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
return pool.all.Get(hash) return pool.all.Get(hash)
} }
@ -984,20 +873,261 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
} }
} }
// requestPromoteExecutables requests a pool reset to the new head block.
// The returned channel is closed when the reset has occurred.
func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
select {
case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}:
return <-pool.reorgDoneCh
case <-pool.reorgShutdownCh:
return pool.reorgShutdownCh
}
}
// requestPromoteExecutables requests transaction promotion checks for the given addresses.
// The returned channel is closed when the promotion checks have occurred.
func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} {
select {
case pool.reqPromoteCh <- set:
return <-pool.reorgDoneCh
case <-pool.reorgShutdownCh:
return pool.reorgShutdownCh
}
}
// queueTxEvent enqueues a transaction event to be sent in the next reorg run.
func (pool *TxPool) queueTxEvent(tx *types.Transaction) {
select {
case pool.queueTxEventCh <- tx:
case <-pool.reorgShutdownCh:
}
}
// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not
// call those methods directly, but request them being run using requestReset and
// requestPromoteExecutables instead.
func (pool *TxPool) scheduleReorgLoop() {
defer pool.wg.Done()
var (
curDone chan struct{} // non-nil while runReorg is active
nextDone = make(chan struct{})
launchNextRun bool
reset *txpoolResetRequest
dirtyAccounts *accountSet
queuedEvents = make(map[common.Address]*txSortedMap)
)
for {
// Launch next background reorg if needed
if curDone == nil && launchNextRun {
// Run the background reorg and announcements
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
// Prepare everything for the next round of reorg
curDone, nextDone = nextDone, make(chan struct{})
launchNextRun = false
reset, dirtyAccounts = nil, nil
queuedEvents = make(map[common.Address]*txSortedMap)
}
select {
case req := <-pool.reqResetCh:
// Reset request: update head if request is already pending.
if reset == nil {
reset = req
} else {
reset.newHead = req.newHead
}
launchNextRun = true
pool.reorgDoneCh <- nextDone
case req := <-pool.reqPromoteCh:
// Promote request: update address set if request is already pending.
if dirtyAccounts == nil {
dirtyAccounts = req
} else {
dirtyAccounts.merge(req)
}
launchNextRun = true
pool.reorgDoneCh <- nextDone
case tx := <-pool.queueTxEventCh:
// Queue up the event, but don't schedule a reorg. It's up to the caller to
// request one later if they want the events sent.
addr, _ := types.Sender(pool.signer, tx)
if _, ok := queuedEvents[addr]; !ok {
queuedEvents[addr] = newTxSortedMap()
}
queuedEvents[addr].Put(tx)
case <-curDone:
curDone = nil
case <-pool.reorgShutdownCh:
// Wait for current run to finish.
if curDone != nil {
<-curDone
}
close(nextDone)
return
}
}
}
// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) {
defer close(done)
var promoteAddrs []common.Address
if dirtyAccounts != nil {
promoteAddrs = dirtyAccounts.flatten()
}
pool.mu.Lock()
if reset != nil {
// Reset from the old head to the new, rescheduling any reorged transactions
pool.reset(reset.oldHead, reset.newHead)
// Nonces were reset, discard any events that became stale
for addr := range events {
events[addr].Forward(pool.pendingState.GetNonce(addr))
if events[addr].Len() == 0 {
delete(events, addr)
}
}
// Reset needs promote for all addresses
promoteAddrs = promoteAddrs[:0]
for addr := range pool.queue {
promoteAddrs = append(promoteAddrs, addr)
}
}
// Check for pending transactions for every account that sent new ones
promoted := pool.promoteExecutables(promoteAddrs)
for _, tx := range promoted {
addr, _ := types.Sender(pool.signer, tx)
if _, ok := events[addr]; !ok {
events[addr] = newTxSortedMap()
}
events[addr].Put(tx)
}
// If a new block appeared, validate the pool of pending transactions. This will
// remove any transaction that has been included in the block or was invalidated
// because of another transaction (e.g. higher gas price).
if reset != nil {
pool.demoteUnexecutables()
}
// Ensure pool.queue and pool.pending sizes stay within the configured limits.
pool.truncatePending()
pool.truncateQueue()
// Update all accounts to the latest known pending nonce
for addr, list := range pool.pending {
txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
}
pool.mu.Unlock()
// Notify subsystems for newly added transactions
if len(events) > 0 {
var txs []*types.Transaction
for _, set := range events {
txs = append(txs, set.Flatten()...)
}
pool.txFeed.Send(NewTxsEvent{txs})
}
}
// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *TxPool) reset(oldHead, newHead *types.Header) {
// If we're reorging an old state, reinject all dropped transactions
var reinject types.Transactions
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug("Skipping deep transaction reorg", "depth", depth)
} else {
// Reorg seems shallow enough to pull in all transactions into memory
var discarded, included types.Transactions
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
)
if rem == nil {
// This can happen if a setHead is performed, where we simply discard the old
// head from the chain.
// If that is the case, we don't have the lost transactions any more, and
// there's nothing to add
if newNum < oldNum {
// If the reorg ended up on a lower number, it's indicative of setHead being the cause
log.Debug("Skipping transaction reset caused by setHead",
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
} else {
// If we reorged to a same or higher number, then it's not a case of setHead
log.Warn("Transaction pool reset with missing oldhead",
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
}
return
}
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
}
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
for rem.Hash() != add.Hash() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
reinject = types.TxDifference(discarded, included)
}
}
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock().Header() // Special case during testing
}
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error("Failed to reset txpool state", "err", err)
return
}
pool.currentState = statedb
pool.pendingState = state.ManageState(statedb)
pool.currentMaxGas = newHead.GasLimit
// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
}
// promoteExecutables moves transactions that have become processable from the // promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all // future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted. // invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(accounts []common.Address) { func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
// Track the promoted transactions to broadcast them at once // Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction var promoted []*types.Transaction
// Gather all the accounts potentially needing updates
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
accounts = append(accounts, addr)
}
}
// Iterate over all accounts and promote any executable transactions // Iterate over all accounts and promote any executable transactions
for _, addr := range accounts { for _, addr := range accounts {
list := pool.queue[addr] list := pool.queue[addr]
@ -1053,16 +1183,21 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
delete(pool.queue, addr) delete(pool.queue, addr)
} }
} }
// Notify subsystem for new promoted transactions. return promoted
if len(promoted) > 0 { }
go pool.txFeed.Send(NewTxsEvent{promoted})
} // truncatePending removes transactions from the pending queue if the pool is above the
// If the pending limit is overflown, start equalizing allowances // pending limit. The algorithm tries to reduce transaction counts by an approximately
// equal number for all for accounts with many pending transactions.
func (pool *TxPool) truncatePending() {
pending := uint64(0) pending := uint64(0)
for _, list := range pool.pending { for _, list := range pool.pending {
pending += uint64(list.Len()) pending += uint64(list.Len())
} }
if pending > pool.config.GlobalSlots { if pending <= pool.config.GlobalSlots {
return
}
pendingBeforeCap := pending pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first // Assemble a spam order to penalize large transactors first
spammers := prque.New(nil) spammers := prque.New(nil)
@ -1111,6 +1246,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
} }
} }
} }
// If still above threshold, reduce to limit or min allowance // If still above threshold, reduce to limit or min allowance
if pending > pool.config.GlobalSlots && len(offenders) > 0 { if pending > pool.config.GlobalSlots && len(offenders) > 0 {
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots { for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
@ -1139,13 +1275,18 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
} }
} }
pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending)) pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
} }
// If we've queued more transactions than the hard limit, drop oldest ones
// truncateQueue drops the oldes transactions in the queue if the pool is above the global queue limit.
func (pool *TxPool) truncateQueue() {
queued := uint64(0) queued := uint64(0)
for _, list := range pool.queue { for _, list := range pool.queue {
queued += uint64(list.Len()) queued += uint64(list.Len())
} }
if queued > pool.config.GlobalQueue { if queued <= pool.config.GlobalQueue {
return
}
// Sort all accounts with queued transactions by heartbeat // Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(pool.queue)) addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue { for addr := range pool.queue {
@ -1179,7 +1320,6 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
queuedRateLimitMeter.Mark(1) queuedRateLimitMeter.Mark(1)
} }
} }
}
} }
// demoteUnexecutables removes invalid and processed transactions from the pools // demoteUnexecutables removes invalid and processed transactions from the pools
@ -1224,7 +1364,7 @@ func (pool *TxPool) demoteUnexecutables() {
log.Error("Demoting invalidated transaction", "hash", hash) log.Error("Demoting invalidated transaction", "hash", hash)
pool.enqueueTx(hash, tx) pool.enqueueTx(hash, tx)
} }
pendingCounter.Inc(int64(len(gapped))) pendingCounter.Dec(int64(len(gapped)))
} }
// Delete the entire queue entry if it became empty. // Delete the entire queue entry if it became empty.
if list.Empty() { if list.Empty() {
@ -1256,11 +1396,15 @@ type accountSet struct {
// newAccountSet creates a new address set with an associated signer for sender // newAccountSet creates a new address set with an associated signer for sender
// derivations. // derivations.
func newAccountSet(signer types.Signer) *accountSet { func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
return &accountSet{ as := &accountSet{
accounts: make(map[common.Address]struct{}), accounts: make(map[common.Address]struct{}),
signer: signer, signer: signer,
} }
for _, addr := range addrs {
as.add(addr)
}
return as
} }
// contains checks if a given address is contained within the set. // contains checks if a given address is contained within the set.
@ -1284,6 +1428,13 @@ func (as *accountSet) add(addr common.Address) {
as.cache = nil as.cache = nil
} }
// addTx adds the sender of tx into the set.
func (as *accountSet) addTx(tx *types.Transaction) {
if addr, err := types.Sender(as.signer, tx); err == nil {
as.add(addr)
}
}
// flatten returns the list of addresses within this set, also caching it for later // flatten returns the list of addresses within this set, also caching it for later
// reuse. The returned slice should not be changed! // reuse. The returned slice should not be changed!
func (as *accountSet) flatten() []common.Address { func (as *accountSet) flatten() []common.Address {
@ -1297,6 +1448,14 @@ func (as *accountSet) flatten() []common.Address {
return *as.cache return *as.cache
} }
// merge adds all addresses from the 'other' set into 'as'.
func (as *accountSet) merge(other *accountSet) {
for addr := range other.accounts {
as.accounts[addr] = struct{}{}
}
as.cache = nil
}
// txLookup is used internally by TxPool to track transactions while allowing lookup without // txLookup is used internally by TxPool to track transactions while allowing lookup without
// mutex contention. // mutex contention.
// //

@ -200,7 +200,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
t.Fatalf("Invalid nonce, want 0, got %d", nonce) t.Fatalf("Invalid nonce, want 0, got %d", nonce)
} }
pool.AddRemotes(types.Transactions{tx0, tx1}) pool.addRemotesSync([]*types.Transaction{tx0, tx1})
nonce = pool.State().GetNonce(address) nonce = pool.State().GetNonce(address)
if nonce != 2 { if nonce != 2 {
@ -209,8 +209,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
// trigger state change in the background // trigger state change in the background
trigger = true trigger = true
<-pool.requestReset(nil, nil)
pool.lockedReset(nil, nil)
_, err := pool.Pending() _, err := pool.Pending()
if err != nil { if err != nil {
@ -268,10 +267,10 @@ func TestTransactionQueue(t *testing.T) {
tx := transaction(0, 100, key) tx := transaction(0, 100, key)
from, _ := deriveSender(tx) from, _ := deriveSender(tx)
pool.currentState.AddBalance(from, big.NewInt(1000)) pool.currentState.AddBalance(from, big.NewInt(1000))
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
pool.enqueueTx(tx.Hash(), tx)
pool.promoteExecutables([]common.Address{from}) pool.enqueueTx(tx.Hash(), tx)
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if len(pool.pending) != 1 { if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending)) t.Error("expected valid txs to be 1 is", len(pool.pending))
} }
@ -280,33 +279,36 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx) from, _ = deriveSender(tx)
pool.currentState.SetNonce(from, 2) pool.currentState.SetNonce(from, 2)
pool.enqueueTx(tx.Hash(), tx) pool.enqueueTx(tx.Hash(), tx)
pool.promoteExecutables([]common.Address{from})
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
t.Error("expected transaction to be in tx pool") t.Error("expected transaction to be in tx pool")
} }
if len(pool.queue) > 0 { if len(pool.queue) > 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue)) t.Error("expected transaction queue to be empty. is", len(pool.queue))
} }
}
pool, key = setupTxPool() func TestTransactionQueue2(t *testing.T) {
t.Parallel()
pool, key := setupTxPool()
defer pool.Stop() defer pool.Stop()
tx1 := transaction(0, 100, key) tx1 := transaction(0, 100, key)
tx2 := transaction(10, 100, key) tx2 := transaction(10, 100, key)
tx3 := transaction(11, 100, key) tx3 := transaction(11, 100, key)
from, _ = deriveSender(tx1) from, _ := deriveSender(tx1)
pool.currentState.AddBalance(from, big.NewInt(1000)) pool.currentState.AddBalance(from, big.NewInt(1000))
pool.lockedReset(nil, nil) pool.reset(nil, nil)
pool.enqueueTx(tx1.Hash(), tx1) pool.enqueueTx(tx1.Hash(), tx1)
pool.enqueueTx(tx2.Hash(), tx2) pool.enqueueTx(tx2.Hash(), tx2)
pool.enqueueTx(tx3.Hash(), tx3) pool.enqueueTx(tx3.Hash(), tx3)
pool.promoteExecutables([]common.Address{from}) pool.promoteExecutables([]common.Address{from})
if len(pool.pending) != 1 { if len(pool.pending) != 1 {
t.Error("expected tx pool to be 1, got", len(pool.pending)) t.Error("expected pending length to be 1, got", len(pool.pending))
} }
if pool.queue[from].Len() != 2 { if pool.queue[from].Len() != 2 {
t.Error("expected len(queue) == 2, got", pool.queue[from].Len()) t.Error("expected len(queue) == 2, got", pool.queue[from].Len())
@ -339,7 +341,7 @@ func TestTransactionChainFork(t *testing.T) {
statedb.AddBalance(addr, big.NewInt(100000000000000)) statedb.AddBalance(addr, big.NewInt(100000000000000))
pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
} }
resetState() resetState()
@ -368,7 +370,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
statedb.AddBalance(addr, big.NewInt(100000000000000)) statedb.AddBalance(addr, big.NewInt(100000000000000))
pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
} }
resetState() resetState()
@ -384,16 +386,17 @@ func TestTransactionDoubleNonce(t *testing.T) {
if replace, err := pool.add(tx2, false); err != nil || !replace { if replace, err := pool.add(tx2, false); err != nil || !replace {
t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
} }
pool.promoteExecutables([]common.Address{addr}) <-pool.requestPromoteExecutables(newAccountSet(signer, addr))
if pool.pending[addr].Len() != 1 { if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
} }
if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() { if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() {
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
} }
// Add the third transaction and ensure it's not saved (smaller price) // Add the third transaction and ensure it's not saved (smaller price)
pool.add(tx3, false) pool.add(tx3, false)
pool.promoteExecutables([]common.Address{addr}) <-pool.requestPromoteExecutables(newAccountSet(signer, addr))
if pool.pending[addr].Len() != 1 { if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
} }
@ -439,7 +442,7 @@ func TestTransactionNonceRecovery(t *testing.T) {
addr := crypto.PubkeyToAddress(key.PublicKey) addr := crypto.PubkeyToAddress(key.PublicKey)
pool.currentState.SetNonce(addr, n) pool.currentState.SetNonce(addr, n)
pool.currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.currentState.AddBalance(addr, big.NewInt(100000000000000))
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
tx := transaction(n, 100000, key) tx := transaction(n, 100000, key)
if err := pool.AddRemote(tx); err != nil { if err := pool.AddRemote(tx); err != nil {
@ -447,7 +450,7 @@ func TestTransactionNonceRecovery(t *testing.T) {
} }
// simulate some weird re-order of transactions and missing nonce(s) // simulate some weird re-order of transactions and missing nonce(s)
pool.currentState.SetNonce(addr, n-1) pool.currentState.SetNonce(addr, n-1)
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
if fn := pool.pendingState.GetNonce(addr); fn != n-1 { if fn := pool.pendingState.GetNonce(addr); fn != n-1 {
t.Errorf("expected nonce to be %d, got %d", n-1, fn) t.Errorf("expected nonce to be %d, got %d", n-1, fn)
} }
@ -491,7 +494,7 @@ func TestTransactionDropping(t *testing.T) {
if pool.all.Count() != 6 { if pool.all.Count() != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6)
} }
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
if pool.pending[account].Len() != 3 { if pool.pending[account].Len() != 3 {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3) t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3)
} }
@ -503,7 +506,7 @@ func TestTransactionDropping(t *testing.T) {
} }
// Reduce the balance of the account, and check that invalidated transactions are dropped // Reduce the balance of the account, and check that invalidated transactions are dropped
pool.currentState.AddBalance(account, big.NewInt(-650)) pool.currentState.AddBalance(account, big.NewInt(-650))
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0) t.Errorf("funded pending transaction missing: %v", tx0)
@ -528,7 +531,7 @@ func TestTransactionDropping(t *testing.T) {
} }
// Reduce the block gas limit, check that invalidated transactions are dropped // Reduce the block gas limit, check that invalidated transactions are dropped
pool.chain.(*testBlockChain).gasLimit = 100 pool.chain.(*testBlockChain).gasLimit = 100
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0) t.Errorf("funded pending transaction missing: %v", tx0)
@ -584,7 +587,7 @@ func TestTransactionPostponing(t *testing.T) {
txs = append(txs, tx) txs = append(txs, tx)
} }
} }
for i, err := range pool.AddRemotes(txs) { for i, err := range pool.addRemotesSync(txs) {
if err != nil { if err != nil {
t.Fatalf("tx %d: failed to add transactions: %v", i, err) t.Fatalf("tx %d: failed to add transactions: %v", i, err)
} }
@ -599,7 +602,7 @@ func TestTransactionPostponing(t *testing.T) {
if pool.all.Count() != len(txs) { if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
} }
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) { if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) {
t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs)) t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs))
} }
@ -613,7 +616,7 @@ func TestTransactionPostponing(t *testing.T) {
for _, addr := range accs { for _, addr := range accs {
pool.currentState.AddBalance(addr, big.NewInt(-1)) pool.currentState.AddBalance(addr, big.NewInt(-1))
} }
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
// The first account's first transaction remains valid, check that subsequent // The first account's first transaction remains valid, check that subsequent
// ones are either filtered out, or queued up for later. // ones are either filtered out, or queued up for later.
@ -680,12 +683,10 @@ func TestTransactionGapFilling(t *testing.T) {
defer sub.Unsubscribe() defer sub.Unsubscribe()
// Create a pending and a queued transaction with a nonce-gap in between // Create a pending and a queued transaction with a nonce-gap in between
if err := pool.AddRemote(transaction(0, 100000, key)); err != nil { pool.addRemotesSync([]*types.Transaction{
t.Fatalf("failed to add pending transaction: %v", err) transaction(0, 100000, key),
} transaction(2, 100000, key),
if err := pool.AddRemote(transaction(2, 100000, key)); err != nil { })
t.Fatalf("failed to add queued transaction: %v", err)
}
pending, queued := pool.Stats() pending, queued := pool.Stats()
if pending != 1 { if pending != 1 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
@ -700,7 +701,7 @@ func TestTransactionGapFilling(t *testing.T) {
t.Fatalf("pool internal state corrupted: %v", err) t.Fatalf("pool internal state corrupted: %v", err)
} }
// Fill the nonce gap and ensure all transactions become pending // Fill the nonce gap and ensure all transactions become pending
if err := pool.AddRemote(transaction(1, 100000, key)); err != nil { if err := pool.addRemoteSync(transaction(1, 100000, key)); err != nil {
t.Fatalf("failed to add gapped transaction: %v", err) t.Fatalf("failed to add gapped transaction: %v", err)
} }
pending, queued = pool.Stats() pending, queued = pool.Stats()
@ -732,7 +733,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
// Keep queuing up transactions and make sure all above a limit are dropped // Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ { for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ {
if err := pool.AddRemote(transaction(i, 100000, key)); err != nil { if err := pool.addRemoteSync(transaction(i, 100000, key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err) t.Fatalf("tx %d: failed to add transaction: %v", i, err)
} }
if len(pool.pending) != 0 { if len(pool.pending) != 0 {
@ -799,7 +800,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
nonces[addr]++ nonces[addr]++
} }
// Import the batch and verify that limits have been enforced // Import the batch and verify that limits have been enforced
pool.AddRemotes(txs) pool.addRemotesSync(txs)
queued := 0 queued := 0
for addr, list := range pool.queue { for addr, list := range pool.queue {
@ -932,7 +933,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
// Keep queuing up transactions and make sure all above a limit are dropped // Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
if err := pool.AddRemote(transaction(i, 100000, key)); err != nil { if err := pool.addRemoteSync(transaction(i, 100000, key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err) t.Fatalf("tx %d: failed to add transaction: %v", i, err)
} }
if pool.pending[account].Len() != int(i)+1 { if pool.pending[account].Len() != int(i)+1 {
@ -953,57 +954,6 @@ func TestTransactionPendingLimiting(t *testing.T) {
} }
} }
// Tests that the transaction limits are enforced the same way irrelevant whether
// the transactions are added one by one or in batches.
func TestTransactionQueueLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 1) }
func TestTransactionPendingLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 0) }
func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
t.Parallel()
// Add a batch of transactions to a pool one by one
pool1, key1 := setupTxPool()
defer pool1.Stop()
account1, _ := deriveSender(transaction(0, 0, key1))
pool1.currentState.AddBalance(account1, big.NewInt(1000000))
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
if err := pool1.AddRemote(transaction(origin+i, 100000, key1)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
}
// Add a batch of transactions to a pool in one big batch
pool2, key2 := setupTxPool()
defer pool2.Stop()
account2, _ := deriveSender(transaction(0, 0, key2))
pool2.currentState.AddBalance(account2, big.NewInt(1000000))
txs := []*types.Transaction{}
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
txs = append(txs, transaction(origin+i, 100000, key2))
}
pool2.AddRemotes(txs)
// Ensure the batch optimization honors the same pool mechanics
if len(pool1.pending) != len(pool2.pending) {
t.Errorf("pending transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.pending), len(pool2.pending))
}
if len(pool1.queue) != len(pool2.queue) {
t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue), len(pool2.queue))
}
if pool1.all.Count() != pool2.all.Count() {
t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", pool1.all.Count(), pool2.all.Count())
}
if err := validateTxPoolInternals(pool1); err != nil {
t.Errorf("pool 1 internal state corrupted: %v", err)
}
if err := validateTxPoolInternals(pool2); err != nil {
t.Errorf("pool 2 internal state corrupted: %v", err)
}
}
// Tests that if the transaction count belonging to multiple accounts go above // Tests that if the transaction count belonging to multiple accounts go above
// some hard threshold, the higher transactions are dropped to prevent DOS // some hard threshold, the higher transactions are dropped to prevent DOS
// attacks. // attacks.
@ -1038,7 +988,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
} }
} }
// Import the batch and verify that limits have been enforced // Import the batch and verify that limits have been enforced
pool.AddRemotes(txs) pool.addRemotesSync(txs)
pending := 0 pending := 0
for _, list := range pool.pending { for _, list := range pool.pending {
@ -1118,7 +1068,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
} }
} }
// Import the batch and verify that limits have been enforced // Import the batch and verify that limits have been enforced
pool.AddRemotes(txs) pool.addRemotesSync(txs)
for addr, list := range pool.pending { for addr, list := range pool.pending {
if list.Len() != int(config.AccountSlots) { if list.Len() != int(config.AccountSlots) {
@ -1174,7 +1124,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[3]) ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[3])
// Import the batch and that both pending and queued transactions match up // Import the batch and that both pending and queued transactions match up
pool.AddRemotes(txs) pool.addRemotesSync(txs)
pool.AddLocal(ltx) pool.AddLocal(ltx)
pending, queued := pool.Stats() pending, queued := pool.Stats()
@ -1454,7 +1404,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
for i := uint64(0); i < config.GlobalSlots; i++ { for i := uint64(0); i < config.GlobalSlots; i++ {
txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0])) txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0]))
} }
pool.AddRemotes(txs) pool.addRemotesSync(txs)
pending, queued := pool.Stats() pending, queued := pool.Stats()
if pending != int(config.GlobalSlots) { if pending != int(config.GlobalSlots) {
@ -1470,7 +1420,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
t.Fatalf("pool internal state corrupted: %v", err) t.Fatalf("pool internal state corrupted: %v", err)
} }
// Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap // Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
t.Fatalf("failed to add well priced transaction: %v", err) t.Fatalf("failed to add well priced transaction: %v", err)
} }
pending, queued = pool.Stats() pending, queued = pool.Stats()
@ -1513,7 +1463,7 @@ func TestTransactionReplacement(t *testing.T) {
price := int64(100) price := int64(100)
threshold := (price * (100 + int64(testTxPoolConfig.PriceBump))) / 100 threshold := (price * (100 + int64(testTxPoolConfig.PriceBump))) / 100
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), key)); err != nil { if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), key)); err != nil {
t.Fatalf("failed to add original cheap pending transaction: %v", err) t.Fatalf("failed to add original cheap pending transaction: %v", err)
} }
if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(1), key)); err != ErrReplaceUnderpriced { if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(1), key)); err != ErrReplaceUnderpriced {
@ -1526,7 +1476,7 @@ func TestTransactionReplacement(t *testing.T) {
t.Fatalf("cheap replacement event firing failed: %v", err) t.Fatalf("cheap replacement event firing failed: %v", err)
} }
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(price), key)); err != nil { if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(price), key)); err != nil {
t.Fatalf("failed to add original proper pending transaction: %v", err) t.Fatalf("failed to add original proper pending transaction: %v", err)
} }
if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(threshold-1), key)); err != ErrReplaceUnderpriced { if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(threshold-1), key)); err != ErrReplaceUnderpriced {
@ -1538,6 +1488,7 @@ func TestTransactionReplacement(t *testing.T) {
if err := validateEvents(events, 2); err != nil { if err := validateEvents(events, 2); err != nil {
t.Fatalf("proper replacement event firing failed: %v", err) t.Fatalf("proper replacement event firing failed: %v", err)
} }
// Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too) // Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(1), key)); err != nil { if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(1), key)); err != nil {
t.Fatalf("failed to add original cheap queued transaction: %v", err) t.Fatalf("failed to add original cheap queued transaction: %v", err)
@ -1615,7 +1566,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil { if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err) t.Fatalf("failed to add local transaction: %v", err)
} }
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil { if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err) t.Fatalf("failed to add remote transaction: %v", err)
} }
pending, queued := pool.Stats() pending, queued := pool.Stats()
@ -1653,7 +1604,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
} }
// Bump the nonce temporarily and ensure the newly invalidated transaction is removed // Bump the nonce temporarily and ensure the newly invalidated transaction is removed
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
pool.lockedReset(nil, nil) <-pool.requestReset(nil, nil)
time.Sleep(2 * config.Rejournal) time.Sleep(2 * config.Rejournal)
pool.Stop() pool.Stop()
@ -1707,7 +1658,7 @@ func TestTransactionStatusCheck(t *testing.T) {
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(1), keys[2])) // Queued only txs = append(txs, pricedTransaction(2, 100000, big.NewInt(1), keys[2])) // Queued only
// Import the transaction and ensure they are correctly added // Import the transaction and ensure they are correctly added
pool.AddRemotes(txs) pool.addRemotesSync(txs)
pending, queued := pool.Stats() pending, queued := pool.Stats()
if pending != 2 { if pending != 2 {
@ -1786,26 +1737,6 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
} }
} }
// Benchmarks the speed of iterative transaction insertion.
func BenchmarkPoolInsert(b *testing.B) {
// Generate a batch of transactions to enqueue into the pool
pool, key := setupTxPool()
defer pool.Stop()
account, _ := deriveSender(transaction(0, 0, key))
pool.currentState.AddBalance(account, big.NewInt(1000000))
txs := make(types.Transactions, b.N)
for i := 0; i < b.N; i++ {
txs[i] = transaction(uint64(i), 100000, key)
}
// Benchmark importing the transactions into the queue
b.ResetTimer()
for _, tx := range txs {
pool.AddRemote(tx)
}
}
// Benchmarks the speed of batched transaction insertion. // Benchmarks the speed of batched transaction insertion.
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) } func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) }
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) } func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) }