From da7d57e07c04dcbb7cc20b35f6606ef3f4c400e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 4 Sep 2017 22:35:00 +0300 Subject: [PATCH] core: make txpool operate on immutable state --- core/blockchain.go | 12 -- core/error.go | 4 + core/state_transition.go | 8 +- core/tx_list.go | 1 + core/tx_pool.go | 203 +++++++++++++++------------ core/tx_pool_test.go | 281 +++++++++++++------------------------ eth/api_backend.go | 8 -- internal/ethapi/api.go | 1 - internal/ethapi/backend.go | 1 - miner/worker.go | 21 ++- 10 files changed, 233 insertions(+), 307 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index f3ca4e08cf..0bb12fc190 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -81,7 +81,6 @@ type BlockChain struct { hc *HeaderChain chainDb ethdb.Database - rmTxFeed event.Feed rmLogsFeed event.Feed chainFeed event.Feed chainSideFeed event.Feed @@ -1194,15 +1193,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { for _, tx := range diff { DeleteTxLookupEntry(bc.chainDb, tx.Hash()) } - // Must be posted in a goroutine because of the transaction pool trying - // to acquire the chain manager lock - if len(diff) > 0 { - go bc.rmTxFeed.Send(RemovedTransactionEvent{diff}) - } if len(deletedLogs) > 0 { go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } - if len(oldChain) > 0 { go func() { for _, block := range oldChain { @@ -1401,11 +1394,6 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config } // Engine retrieves the blockchain's consensus engine. func (bc *BlockChain) Engine() consensus.Engine { return bc.engine } -// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent. -func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription { - return bc.scope.Track(bc.rmTxFeed.Subscribe(ch)) -} - // SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent. func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription { return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch)) diff --git a/core/error.go b/core/error.go index 9ac4fff514..410eca1e1e 100644 --- a/core/error.go +++ b/core/error.go @@ -28,4 +28,8 @@ var ( // ErrBlacklistedHash is returned if a block to import is on the blacklist. ErrBlacklistedHash = errors.New("blacklisted hash") + + // ErrNonceTooHigh is returned if the nonce of a transaction is higher than the + // next one expected based on the local chain. + ErrNonceTooHigh = errors.New("nonce too high") ) diff --git a/core/state_transition.go b/core/state_transition.go index bab4540be8..e7a0685893 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -18,7 +18,6 @@ package core import ( "errors" - "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -197,8 +196,11 @@ func (st *StateTransition) preCheck() error { // Make sure this transaction's nonce is correct if msg.CheckNonce() { - if n := st.state.GetNonce(sender.Address()); n != msg.Nonce() { - return fmt.Errorf("invalid nonce: have %d, expected %d", msg.Nonce(), n) + nonce := st.state.GetNonce(sender.Address()) + if nonce < msg.Nonce() { + return ErrNonceTooHigh + } else if nonce > msg.Nonce() { + return ErrNonceTooLow } } return st.buyGas() diff --git a/core/tx_list.go b/core/tx_list.go index 1087970fac..2935929d72 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -298,6 +298,7 @@ func (l *txList) Filter(costLimit, gasLimit *big.Int) (types.Transactions, types // If the list was strict, filter anything above the lowest nonce var invalids types.Transactions + if l.strict && len(removed) > 0 { lowest := uint64(math.MaxUint64) for _, tx := range removed { diff --git a/core/tx_pool.go b/core/tx_pool.go index d835c94d1a..f41fbe069a 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -105,10 +105,11 @@ var ( // blockChain provides the state of blockchain and current gas limit to do // some pre checks in tx pool and event subscribers. type blockChain interface { - State() (*state.StateDB, error) - GasLimit() *big.Int + CurrentHeader() *types.Header SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription - SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription + + GetBlock(hash common.Hash, number uint64) *types.Block + StateAt(root common.Hash) (*state.StateDB, error) } // TxPoolConfig are the configuration parameters of the transaction pool. @@ -174,18 +175,19 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { type TxPool struct { config TxPoolConfig chainconfig *params.ChainConfig - blockChain blockChain - pendingState *state.ManagedState + chain blockChain gasPrice *big.Int txFeed event.Feed scope event.SubscriptionScope chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription - rmTxCh chan RemovedTransactionEvent - rmTxSub event.Subscription signer types.Signer mu sync.RWMutex + currentState *state.StateDB // Current state in the blockchain head + pendingState *state.ManagedState // Pending state tracking virtual nonces + currentMaxGas *big.Int // Current gas limit for transaction caps + locals *accountSet // Set of local transaction to exepmt from evicion rules journal *txJournal // Journal of local transaction to back up to disk @@ -202,28 +204,26 @@ type TxPool struct { // NewTxPool creates a new transaction pool to gather, sort and filter inbound // trnsactions from the network. -func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool { +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - blockChain: blockChain, - signer: types.NewEIP155Signer(chainconfig.ChainId), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: make(map[common.Hash]*types.Transaction), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - rmTxCh: make(chan RemovedTransactionEvent, rmTxChanSize), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), - pendingState: nil, + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.NewEIP155Signer(chainconfig.ChainId), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: make(map[common.Hash]*types.Transaction), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) - pool.reset() + pool.reset(nil, chain.CurrentHeader()) // If local transactions and journaling is enabled, load from disk if !config.NoLocals && config.Journal != "" { @@ -237,8 +237,8 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain } } // Subscribe events from blockchain - pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh) - pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh) + pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) + // Start the event loop and return pool.wg.Add(1) go pool.loop() @@ -264,31 +264,28 @@ func (pool *TxPool) loop() { journal := time.NewTicker(pool.config.Rejournal) defer journal.Stop() + // Track the previous head headers for transaction reorgs + head := pool.chain.CurrentHeader() + // Keep waiting for and reacting to the various events for { select { // Handle ChainHeadEvent case ev := <-pool.chainHeadCh: - pool.mu.Lock() if ev.Block != nil { + pool.mu.Lock() if pool.chainconfig.IsHomestead(ev.Block.Number()) { pool.homestead = true } + pool.reset(head, ev.Block.Header()) + head = ev.Block.Header() + pool.mu.Unlock() } - pool.reset() - pool.mu.Unlock() // Be unsubscribed due to system stopped case <-pool.chainHeadSub.Err(): return - // Handle RemovedTransactionEvent - case ev := <-pool.rmTxCh: - pool.addTxs(ev.Txs, false) - // Be unsubscribed due to system stopped - case <-pool.rmTxSub.Err(): - return - // Handle stats reporting ticks case <-report.C: pool.mu.RLock() @@ -333,28 +330,76 @@ func (pool *TxPool) loop() { // lockedReset is a wrapper around reset to allow calling it in a thread safe // manner. This method is only ever used in the tester! -func (pool *TxPool) lockedReset() { +func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) { pool.mu.Lock() defer pool.mu.Unlock() - pool.reset() + pool.reset(oldHead, newHead) } // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. -func (pool *TxPool) reset() { - currentState, err := pool.blockChain.State() +func (pool *TxPool) reset(oldHead, newHead *types.Header) { + // If we're reorging an old state, reinject all dropped transactions + var reinject types.Transactions + + if oldHead != nil && oldHead.Hash() != newHead.ParentHash { + var discarded, included types.Transactions + + var ( + rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) + add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) + ) + for rem.NumberU64() > add.NumberU64() { + discarded = append(discarded, rem.Transactions()...) + if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { + log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) + return + } + } + for add.NumberU64() > rem.NumberU64() { + included = append(included, add.Transactions()...) + if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { + log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) + return + } + } + for rem.Hash() != add.Hash() { + discarded = append(discarded, rem.Transactions()...) + if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { + log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) + return + } + included = append(included, add.Transactions()...) + if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { + log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) + return + } + } + reinject = types.TxDifference(discarded, included) + } + // Initialize the internal state to the current head + if newHead == nil { + newHead = pool.chain.CurrentHeader() // Special case during testing + } + statedb, err := pool.chain.StateAt(newHead.Root) if err != nil { - log.Error("Failed reset txpool state", "err", err) + log.Error("Failed to reset txpool state", "err", err) return } - pool.pendingState = state.ManageState(currentState) + pool.currentState = statedb + pool.pendingState = state.ManageState(statedb) + pool.currentMaxGas = newHead.GasLimit + + // Inject any transactions discarded due to reorgs + log.Debug("Reinjecting stale transactions", "count", len(reinject)) + pool.addTxsLocked(reinject, false) // validate the pool of pending transactions, this will remove // any transactions that have been included in the block or // have been invalidated because of another transaction (e.g. // higher gas price) - pool.demoteUnexecutables(currentState) + pool.demoteUnexecutables() // Update all accounts to the latest known pending nonce for addr, list := range pool.pending { @@ -363,16 +408,16 @@ func (pool *TxPool) reset() { } // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid - pool.promoteExecutables(currentState, nil) + pool.promoteExecutables(nil) } // Stop terminates the transaction pool. func (pool *TxPool) Stop() { // Unsubscribe all subscriptions registered from txpool pool.scope.Close() + // Unsubscribe subscriptions registered from blockchain pool.chainHeadSub.Unsubscribe() - pool.rmTxSub.Unsubscribe() pool.wg.Wait() if pool.journal != nil { @@ -442,8 +487,8 @@ func (pool *TxPool) stats() (int, int) { // Content retrieves the data content of the transaction pool, returning all the // pending as well as queued transactions, grouped by account and sorted by nonce. func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { - pool.mu.RLock() - defer pool.mu.RUnlock() + pool.mu.Lock() + defer pool.mu.Unlock() pending := make(map[common.Address]types.Transactions) for addr, list := range pool.pending { @@ -499,7 +544,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. - if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 { + if pool.currentMaxGas.Cmp(tx.Gas()) < 0 { return ErrGasLimit } // Make sure the transaction is signed properly @@ -513,16 +558,12 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering - currentState, err := pool.blockChain.State() - if err != nil { - return err - } - if currentState.GetNonce(from) > tx.Nonce() { + if pool.currentState.GetNonce(from) > tx.Nonce() { return ErrNonceTooLow } // Transactor should have enough funds to cover the costs // cost == V + GP * GL - if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { + if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { return ErrInsufficientFunds } intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) @@ -721,12 +762,8 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { } // If we added a new transaction, run promotion checks and return if !replace { - state, err := pool.blockChain.State() - if err != nil { - return err - } from, _ := types.Sender(pool.signer, tx) // already validated - pool.promoteExecutables(state, []common.Address{from}) + pool.promoteExecutables([]common.Address{from}) } return nil } @@ -736,6 +773,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { pool.mu.Lock() defer pool.mu.Unlock() + return pool.addTxsLocked(txs, local) +} + +// addTxsLocked attempts to queue a batch of transactions if they are valid, +// whilst assuming the transaction pool lock is already held. +func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error { // Add the batch of transaction, tracking the accepted ones dirty := make(map[common.Address]struct{}) for _, tx := range txs { @@ -748,15 +791,11 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { } // Only reprocess the internal state if something was actually added if len(dirty) > 0 { - state, err := pool.blockChain.State() - if err != nil { - return err - } addrs := make([]common.Address, 0, len(dirty)) for addr, _ := range dirty { addrs = append(addrs, addr) } - pool.promoteExecutables(state, addrs) + pool.promoteExecutables(addrs) } return nil } @@ -770,24 +809,6 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction { return pool.all[hash] } -// Remove removes the transaction with the given hash from the pool. -func (pool *TxPool) Remove(hash common.Hash) { - pool.mu.Lock() - defer pool.mu.Unlock() - - pool.removeTx(hash) -} - -// RemoveBatch removes all given transactions from the pool. -func (pool *TxPool) RemoveBatch(txs types.Transactions) { - pool.mu.Lock() - defer pool.mu.Unlock() - - for _, tx := range txs { - pool.removeTx(tx.Hash()) - } -} - // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. func (pool *TxPool) removeTx(hash common.Hash) { @@ -834,9 +855,7 @@ func (pool *TxPool) removeTx(hash common.Hash) { // promoteExecutables moves transactions that have become processable from the // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. -func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) { - gaslimit := pool.blockChain.GasLimit() - +func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Gather all the accounts potentially needing updates if accounts == nil { accounts = make([]common.Address, 0, len(pool.queue)) @@ -851,14 +870,14 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A continue // Just in case someone calls with a non existing account } // Drop all transactions that are deemed too old (low nonce) - for _, tx := range list.Forward(state.GetNonce(addr)) { + for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) { hash := tx.Hash() log.Trace("Removed old queued transaction", "hash", hash) delete(pool.all, hash) pool.priced.Removed() } // Drop all transactions that are too costly (low balance or out of gas) - drops, _ := list.Filter(state.GetBalance(addr), gaslimit) + drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() log.Trace("Removed unpayable queued transaction", "hash", hash) @@ -1003,12 +1022,10 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A // demoteUnexecutables removes invalid and processed transactions from the pools // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. -func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { - gaslimit := pool.blockChain.GasLimit() - +func (pool *TxPool) demoteUnexecutables() { // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { - nonce := state.GetNonce(addr) + nonce := pool.currentState.GetNonce(addr) // Drop all transactions that are deemed too old (low nonce) for _, tx := range list.Forward(nonce) { @@ -1018,7 +1035,7 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { pool.priced.Removed() } // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later - drops, invalids := list.Filter(state.GetBalance(addr), gaslimit) + drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() log.Trace("Removed unpayable pending transaction", "hash", hash) @@ -1031,6 +1048,14 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { log.Trace("Demoting pending transaction", "hash", hash) pool.enqueueTx(hash, tx) } + // If there's a gap in front, warn (should never happen) and postpone all transactions + if list.Len() > 0 && list.txs.Get(nonce) == nil { + for _, tx := range list.Cap(0) { + hash := tx.Hash() + log.Error("Demoting invalidated transaction", "hash", hash) + pool.enqueueTx(hash, tx) + } + } // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.pending, addr) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index c1bcb1b2d6..cdd45b4b1d 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -48,23 +48,24 @@ type testBlockChain struct { statedb *state.StateDB gasLimit *big.Int chainHeadFeed *event.Feed - rmTxFeed *event.Feed } -func (bc *testBlockChain) State() (*state.StateDB, error) { - return bc.statedb, nil -} - -func (bc *testBlockChain) GasLimit() *big.Int { - return new(big.Int).Set(bc.gasLimit) +func (bc *testBlockChain) CurrentHeader() *types.Header { + return &types.Header{ + GasLimit: bc.gasLimit, + } } func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { return bc.chainHeadFeed.Subscribe(ch) } -func (bc *testBlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription { - return bc.rmTxFeed.Subscribe(ch) +func (bc *testBlockChain) GetBlock(hash common.Hash, number uint64) *types.Block { + return types.NewBlock(bc.CurrentHeader(), nil, nil, nil) +} + +func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) { + return bc.statedb, nil } func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction { @@ -79,7 +80,7 @@ func pricedTransaction(nonce uint64, gaslimit, gasprice *big.Int, key *ecdsa.Pri func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} key, _ := crypto.GenerateKey() pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) @@ -159,7 +160,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) { // setup pool with 2 transaction in it statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether)) - blockchain := &testChain{&testBlockChain{statedb, big.NewInt(1000000000), new(event.Feed), new(event.Feed)}, address, &trigger} + blockchain := &testChain{&testBlockChain{statedb, big.NewInt(1000000000), new(event.Feed)}, address, &trigger} tx0 := transaction(0, big.NewInt(100000), key) tx1 := transaction(1, big.NewInt(100000), key) @@ -182,7 +183,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) { // trigger state change in the background trigger = true - pool.lockedReset() + pool.lockedReset(nil, nil) pendingTx, err := pool.Pending() if err != nil { @@ -205,20 +206,20 @@ func TestInvalidTransactions(t *testing.T) { tx := transaction(0, big.NewInt(100), key) from, _ := deriveSender(tx) - currentState, _ := pool.blockChain.State() - currentState.AddBalance(from, big.NewInt(1)) + + pool.currentState.AddBalance(from, big.NewInt(1)) if err := pool.AddRemote(tx); err != ErrInsufficientFunds { t.Error("expected", ErrInsufficientFunds) } balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice())) - currentState.AddBalance(from, balance) + pool.currentState.AddBalance(from, balance) if err := pool.AddRemote(tx); err != ErrIntrinsicGas { t.Error("expected", ErrIntrinsicGas, "got", err) } - currentState.SetNonce(from, 1) - currentState.AddBalance(from, big.NewInt(0xffffffffffffff)) + pool.currentState.SetNonce(from, 1) + pool.currentState.AddBalance(from, big.NewInt(0xffffffffffffff)) tx = transaction(0, big.NewInt(100000), key) if err := pool.AddRemote(tx); err != ErrNonceTooLow { t.Error("expected", ErrNonceTooLow) @@ -240,21 +241,20 @@ func TestTransactionQueue(t *testing.T) { tx := transaction(0, big.NewInt(100), key) from, _ := deriveSender(tx) - currentState, _ := pool.blockChain.State() - currentState.AddBalance(from, big.NewInt(1000)) - pool.lockedReset() + pool.currentState.AddBalance(from, big.NewInt(1000)) + pool.lockedReset(nil, nil) pool.enqueueTx(tx.Hash(), tx) - pool.promoteExecutables(currentState, []common.Address{from}) + pool.promoteExecutables([]common.Address{from}) if len(pool.pending) != 1 { t.Error("expected valid txs to be 1 is", len(pool.pending)) } tx = transaction(1, big.NewInt(100), key) from, _ = deriveSender(tx) - currentState.SetNonce(from, 2) + pool.currentState.SetNonce(from, 2) pool.enqueueTx(tx.Hash(), tx) - pool.promoteExecutables(currentState, []common.Address{from}) + pool.promoteExecutables([]common.Address{from}) if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") } @@ -270,15 +270,14 @@ func TestTransactionQueue(t *testing.T) { tx2 := transaction(10, big.NewInt(100), key) tx3 := transaction(11, big.NewInt(100), key) from, _ = deriveSender(tx1) - currentState, _ = pool.blockChain.State() - currentState.AddBalance(from, big.NewInt(1000)) - pool.lockedReset() + pool.currentState.AddBalance(from, big.NewInt(1000)) + pool.lockedReset(nil, nil) pool.enqueueTx(tx1.Hash(), tx1) pool.enqueueTx(tx2.Hash(), tx2) pool.enqueueTx(tx3.Hash(), tx3) - pool.promoteExecutables(currentState, []common.Address{from}) + pool.promoteExecutables([]common.Address{from}) if len(pool.pending) != 1 { t.Error("expected tx pool to be 1, got", len(pool.pending)) @@ -288,45 +287,13 @@ func TestTransactionQueue(t *testing.T) { } } -func TestRemoveTx(t *testing.T) { - pool, key := setupTxPool() - defer pool.Stop() - - addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.blockChain.State() - currentState.AddBalance(addr, big.NewInt(1)) - - tx1 := transaction(0, big.NewInt(100), key) - tx2 := transaction(2, big.NewInt(100), key) - - pool.promoteTx(addr, tx1.Hash(), tx1) - pool.enqueueTx(tx2.Hash(), tx2) - - if len(pool.queue) != 1 { - t.Error("expected queue to be 1, got", len(pool.queue)) - } - if len(pool.pending) != 1 { - t.Error("expected pending to be 1, got", len(pool.pending)) - } - pool.Remove(tx1.Hash()) - pool.Remove(tx2.Hash()) - - if len(pool.queue) > 0 { - t.Error("expected queue to be 0, got", len(pool.queue)) - } - if len(pool.pending) > 0 { - t.Error("expected pending to be 0, got", len(pool.pending)) - } -} - func TestNegativeValue(t *testing.T) { pool, key := setupTxPool() defer pool.Stop() tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(-1), big.NewInt(100), big.NewInt(1), nil), types.HomesteadSigner{}, key) from, _ := deriveSender(tx) - currentState, _ := pool.blockChain.State() - currentState.AddBalance(from, big.NewInt(1)) + pool.currentState.AddBalance(from, big.NewInt(1)) if err := pool.AddRemote(tx); err != ErrNegativeValue { t.Error("expected", ErrNegativeValue, "got", err) } @@ -340,10 +307,10 @@ func TestTransactionChainFork(t *testing.T) { resetState := func() { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} - currentState, _ := pool.blockChain.State() - currentState.AddBalance(addr, big.NewInt(100000000000000)) - pool.lockedReset() + statedb.AddBalance(addr, big.NewInt(100000000000000)) + + pool.chain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} + pool.lockedReset(nil, nil) } resetState() @@ -351,7 +318,7 @@ func TestTransactionChainFork(t *testing.T) { if _, err := pool.add(tx, false); err != nil { t.Error("didn't expect error", err) } - pool.RemoveBatch([]*types.Transaction{tx}) + pool.removeTx(tx.Hash()) // reset the pool's internal state resetState() @@ -368,10 +335,10 @@ func TestTransactionDoubleNonce(t *testing.T) { resetState := func() { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} - currentState, _ := pool.blockChain.State() - currentState.AddBalance(addr, big.NewInt(100000000000000)) - pool.lockedReset() + statedb.AddBalance(addr, big.NewInt(100000000000000)) + + pool.chain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} + pool.lockedReset(nil, nil) } resetState() @@ -387,8 +354,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if replace, err := pool.add(tx2, false); err != nil || !replace { t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } - state, _ := pool.blockChain.State() - pool.promoteExecutables(state, []common.Address{addr}) + pool.promoteExecutables([]common.Address{addr}) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } @@ -397,7 +363,7 @@ func TestTransactionDoubleNonce(t *testing.T) { } // Add the third transaction and ensure it's not saved (smaller price) pool.add(tx3, false) - pool.promoteExecutables(state, []common.Address{addr}) + pool.promoteExecutables([]common.Address{addr}) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } @@ -415,8 +381,7 @@ func TestMissingNonce(t *testing.T) { defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.blockChain.State() - currentState.AddBalance(addr, big.NewInt(100000000000000)) + pool.currentState.AddBalance(addr, big.NewInt(100000000000000)) tx := transaction(1, big.NewInt(100000), key) if _, err := pool.add(tx, false); err != nil { t.Error("didn't expect error", err) @@ -432,47 +397,25 @@ func TestMissingNonce(t *testing.T) { } } -func TestNonceRecovery(t *testing.T) { +func TestTransactionNonceRecovery(t *testing.T) { const n = 10 pool, key := setupTxPool() defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.blockChain.State() - currentState.SetNonce(addr, n) - currentState.AddBalance(addr, big.NewInt(100000000000000)) - pool.lockedReset() + pool.currentState.SetNonce(addr, n) + pool.currentState.AddBalance(addr, big.NewInt(100000000000000)) + pool.lockedReset(nil, nil) + tx := transaction(n, big.NewInt(100000), key) if err := pool.AddRemote(tx); err != nil { t.Error(err) } // simulate some weird re-order of transactions and missing nonce(s) - currentState.SetNonce(addr, n-1) - pool.lockedReset() - if fn := pool.pendingState.GetNonce(addr); fn != n+1 { - t.Errorf("expected nonce to be %d, got %d", n+1, fn) - } -} - -func TestRemovedTxEvent(t *testing.T) { - pool, key := setupTxPool() - defer pool.Stop() - - tx := transaction(0, big.NewInt(1000000), key) - from, _ := deriveSender(tx) - currentState, _ := pool.blockChain.State() - currentState.AddBalance(from, big.NewInt(1000000000000)) - pool.lockedReset() - blockChain, _ := pool.blockChain.(*testBlockChain) - blockChain.rmTxFeed.Send(RemovedTransactionEvent{types.Transactions{tx}}) - blockChain.chainHeadFeed.Send(ChainHeadEvent{nil}) - // wait for handling events - <-time.After(500 * time.Millisecond) - if pool.pending[from].Len() != 1 { - t.Error("expected 1 pending tx, got", pool.pending[from].Len()) - } - if len(pool.all) != 1 { - t.Error("expected 1 total transactions, got", len(pool.all)) + pool.currentState.SetNonce(addr, n-1) + pool.lockedReset(nil, nil) + if fn := pool.pendingState.GetNonce(addr); fn != n-1 { + t.Errorf("expected nonce to be %d, got %d", n-1, fn) } } @@ -484,9 +427,7 @@ func TestTransactionDropping(t *testing.T) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - - state, _ := pool.blockChain.State() - state.AddBalance(account, big.NewInt(1000)) + pool.currentState.AddBalance(account, big.NewInt(1000)) // Add some pending and some queued transactions var ( @@ -514,7 +455,7 @@ func TestTransactionDropping(t *testing.T) { if len(pool.all) != 6 { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) } - pool.lockedReset() + pool.lockedReset(nil, nil) if pool.pending[account].Len() != 3 { t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3) } @@ -525,8 +466,8 @@ func TestTransactionDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) } // Reduce the balance of the account, and check that invalidated transactions are dropped - state.AddBalance(account, big.NewInt(-650)) - pool.lockedReset() + pool.currentState.AddBalance(account, big.NewInt(-650)) + pool.lockedReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) @@ -550,8 +491,8 @@ func TestTransactionDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) } // Reduce the block gas limit, check that invalidated transactions are dropped - pool.blockChain.(*testBlockChain).gasLimit = big.NewInt(100) - pool.lockedReset() + pool.chain.(*testBlockChain).gasLimit = big.NewInt(100) + pool.lockedReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) @@ -579,9 +520,7 @@ func TestTransactionPostponing(t *testing.T) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - - state, _ := pool.blockChain.State() - state.AddBalance(account, big.NewInt(1000)) + pool.currentState.AddBalance(account, big.NewInt(1000)) // Add a batch consecutive pending transactions for validation txns := []*types.Transaction{} @@ -605,7 +544,7 @@ func TestTransactionPostponing(t *testing.T) { if len(pool.all) != len(txns) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)) } - pool.lockedReset() + pool.lockedReset(nil, nil) if pool.pending[account].Len() != len(txns) { t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), len(txns)) } @@ -616,8 +555,8 @@ func TestTransactionPostponing(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)) } // Reduce the balance of the account, and check that transactions are reorganised - state.AddBalance(account, big.NewInt(-750)) - pool.lockedReset() + pool.currentState.AddBalance(account, big.NewInt(-750)) + pool.lockedReset(nil, nil) if _, ok := pool.pending[account].txs.items[txns[0].Nonce()]; !ok { t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0]) @@ -655,10 +594,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - - state, _ := pool.blockChain.State() - state.AddBalance(account, big.NewInt(1000000)) - pool.lockedReset() + pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ { @@ -699,7 +635,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} config := testTxPoolConfig config.NoLocals = nolocals @@ -709,12 +645,10 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { defer pool.Stop() // Create a number of test accounts and fund them (last one will be the local) - state, _ := pool.blockChain.State() - keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() - state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) } local := keys[len(keys)-1] @@ -790,7 +724,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { // Create the pool to test the non-expiration enforcement db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} config := testTxPoolConfig config.Lifetime = time.Second @@ -803,9 +737,8 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { local, _ := crypto.GenerateKey() remote, _ := crypto.GenerateKey() - state, _ := pool.blockChain.State() - state.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) - state.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) // Add the two transactions and ensure they both are queued up if err := pool.AddLocal(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), local)); err != nil { @@ -854,10 +787,7 @@ func TestTransactionPendingLimiting(t *testing.T) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - - state, _ := pool.blockChain.State() - state.AddBalance(account, big.NewInt(1000000)) - pool.lockedReset() + pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { @@ -887,8 +817,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { defer pool1.Stop() account1, _ := deriveSender(transaction(0, big.NewInt(0), key1)) - state1, _ := pool1.blockChain.State() - state1.AddBalance(account1, big.NewInt(1000000)) + pool1.currentState.AddBalance(account1, big.NewInt(1000000)) for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { if err := pool1.AddRemote(transaction(origin+i, big.NewInt(100000), key1)); err != nil { @@ -900,8 +829,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { defer pool2.Stop() account2, _ := deriveSender(transaction(0, big.NewInt(0), key2)) - state2, _ := pool2.blockChain.State() - state2.AddBalance(account2, big.NewInt(1000000)) + pool2.currentState.AddBalance(account2, big.NewInt(1000000)) txns := []*types.Transaction{} for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { @@ -934,7 +862,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = config.AccountSlots * 10 @@ -943,12 +871,10 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.blockChain.State() - keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() - state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) } // Generate and queue a batch of transactions nonces := make(map[common.Address]uint64) @@ -981,7 +907,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} config := testTxPoolConfig config.AccountSlots = 2 @@ -992,11 +918,9 @@ func TestTransactionCapClearsFromAll(t *testing.T) { defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.blockChain.State() - key, _ := crypto.GenerateKey() addr := crypto.PubkeyToAddress(key.PublicKey) - state.AddBalance(addr, big.NewInt(1000000)) + pool.currentState.AddBalance(addr, big.NewInt(1000000)) txs := types.Transactions{} for j := 0; j < int(config.GlobalSlots)*2; j++ { @@ -1016,7 +940,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = 0 @@ -1025,12 +949,10 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.blockChain.State() - keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() - state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) } // Generate and queue a batch of transactions nonces := make(map[common.Address]uint64) @@ -1065,18 +987,16 @@ func TestTransactionPoolRepricing(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.blockChain.State() - keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() - state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) } // Generate and queue a batch of transactions, both pending and queued txs := types.Transactions{} @@ -1147,18 +1067,16 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.blockChain.State() - keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() - state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000*1000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000*1000000)) } // Create transaction (both pending and queued) with a linearly growing gasprice for i := uint64(0); i < 500; i++ { @@ -1189,11 +1107,11 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) { } } validate() - + // Reprice the pool and check that nothing is dropped pool.SetGasPrice(big.NewInt(2)) validate() - + pool.SetGasPrice(big.NewInt(2)) pool.SetGasPrice(big.NewInt(4)) pool.SetGasPrice(big.NewInt(8)) @@ -1210,7 +1128,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = 2 @@ -1220,12 +1138,10 @@ func TestTransactionPoolUnderpricing(t *testing.T) { defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.blockChain.State() - keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() - state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) } // Generate and queue a batch of transactions, both pending and queued txs := types.Transactions{} @@ -1298,16 +1214,14 @@ func TestTransactionReplacement(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() // Create a test account to add transactions with key, _ := crypto.GenerateKey() - - state, _ := pool.blockChain.State() - state.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000)) // Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too) price := int64(100) @@ -1378,7 +1292,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // Create the original pool to inject transaction into the journal db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} config := testTxPoolConfig config.NoLocals = nolocals @@ -1391,9 +1305,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { local, _ := crypto.GenerateKey() remote, _ := crypto.GenerateKey() - statedb, _ = pool.blockChain.State() - statedb.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) - statedb.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) + pool.currentState.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) // Add three local and a remote transactions and ensure they are queued up if err := pool.AddLocal(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), local)); err != nil { @@ -1421,7 +1334,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() @@ -1442,11 +1355,11 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { } // Bump the nonce temporarily and ensure the newly invalidated transaction is removed statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) - pool.lockedReset() + pool.lockedReset(nil, nil) time.Sleep(2 * config.Rejournal) pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)} pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() @@ -1480,8 +1393,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.blockChain.State() - state.AddBalance(account, big.NewInt(1000000)) + pool.currentState.AddBalance(account, big.NewInt(1000000)) for i := 0; i < size; i++ { tx := transaction(uint64(i), big.NewInt(100000), key) @@ -1490,7 +1402,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.demoteUnexecutables(state) + pool.demoteUnexecutables() } } @@ -1506,8 +1418,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.blockChain.State() - state.AddBalance(account, big.NewInt(1000000)) + pool.currentState.AddBalance(account, big.NewInt(1000000)) for i := 0; i < size; i++ { tx := transaction(uint64(1+i), big.NewInt(100000), key) @@ -1516,7 +1427,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.promoteExecutables(state, nil) + pool.promoteExecutables(nil) } } @@ -1527,8 +1438,7 @@ func BenchmarkPoolInsert(b *testing.B) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.blockChain.State() - state.AddBalance(account, big.NewInt(1000000)) + pool.currentState.AddBalance(account, big.NewInt(1000000)) txs := make(types.Transactions, b.N) for i := 0; i < b.N; i++ { @@ -1552,8 +1462,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.blockChain.State() - state.AddBalance(account, big.NewInt(1000000)) + pool.currentState.AddBalance(account, big.NewInt(1000000)) batches := make([]types.Transactions, b.N) for i := 0; i < b.N; i++ { diff --git a/eth/api_backend.go b/eth/api_backend.go index abf52326b6..19ef79f234 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -115,10 +115,6 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil } -func (b *EthApiBackend) SubscribeRemovedTxEvent(ch chan<- core.RemovedTransactionEvent) event.Subscription { - return b.eth.BlockChain().SubscribeRemovedTxEvent(ch) -} - func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch) } @@ -143,10 +139,6 @@ func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) return b.eth.txPool.AddLocal(signedTx) } -func (b *EthApiBackend) RemoveTx(txHash common.Hash) { - b.eth.txPool.Remove(txHash) -} - func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) { pending, err := b.eth.txPool.Pending() if err != nil { diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 30710aaabd..0775749e70 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1265,7 +1265,6 @@ func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, sendArgs SendTxAr if err != nil { return common.Hash{}, err } - s.b.RemoveTx(p.Hash()) if err = s.b.SendTx(ctx, signedTx); err != nil { return common.Hash{}, err } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index be17ffeae4..368fa48726 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -59,7 +59,6 @@ type Backend interface { // TxPool API SendTx(ctx context.Context, signedTx *types.Transaction) error - RemoveTx(txHash common.Hash) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) diff --git a/miner/worker.go b/miner/worker.go index 24e03be60a..5bac5d6e85 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -71,7 +71,6 @@ type Work struct { family *set.Set // family set (used for checking uncle invalidity) uncles *set.Set // uncle set tcount int // tx count in cycle - failedTxs types.Transactions Block *types.Block // the new block @@ -477,8 +476,6 @@ func (self *worker) commitNewWork() { txs := types.NewTransactionsByPriceAndNonce(pending) work.commitTransactions(self.mux, txs, self.chain, self.coinbase) - self.eth.TxPool().RemoveBatch(work.failedTxs) - // compute uncles for the new block. var ( uncles []*types.Header @@ -563,6 +560,16 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB log.Trace("Gas limit exceeded for current block", "sender", from) txs.Pop() + case core.ErrNonceTooLow: + // New head notification data race between the transaction pool and miner, shift + log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) + txs.Shift() + + case core.ErrNonceTooHigh: + // Reorg notification data race between the transaction pool and miner, skip account = + log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) + txs.Pop() + case nil: // Everything ok, collect the logs and shift in the next transaction from the same account coalescedLogs = append(coalescedLogs, logs...) @@ -570,10 +577,10 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB txs.Shift() default: - // Pop the current failed transaction without shifting in the next from the account - log.Trace("Transaction failed, will be removed", "hash", tx.Hash(), "err", err) - env.failedTxs = append(env.failedTxs, tx) - txs.Pop() + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) + txs.Shift() } }