Merge pull request #1223 from obscuren/tx-pool-vroooooom

core: fixed race condition in the transaction pool
This commit is contained in:
Jeffrey Wilcke 2015-06-10 12:10:18 -07:00
commit acb59f3243
3 changed files with 32 additions and 41 deletions

@ -6,8 +6,8 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
// "github.com/ethereum/go-ethereum/crypto" // "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
) )
@ -76,8 +76,5 @@ func NewTestManager() *TestManager {
// testManager.blockChain = NewChainManager(testManager) // testManager.blockChain = NewChainManager(testManager)
// testManager.stateManager = NewStateManager(testManager) // testManager.stateManager = NewStateManager(testManager)
// Start the tx pool
testManager.txPool.Start()
return testManager return testManager
} }

@ -50,7 +50,7 @@ type TxPool struct {
} }
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
return &TxPool{ pool := &TxPool{
pending: make(map[common.Hash]*types.Transaction), pending: make(map[common.Hash]*types.Transaction),
queue: make(map[common.Address]map[common.Hash]*types.Transaction), queue: make(map[common.Address]map[common.Hash]*types.Transaction),
quit: make(chan bool), quit: make(chan bool),
@ -58,14 +58,17 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
currentState: currentStateFn, currentState: currentStateFn,
gasLimit: gasLimitFn, gasLimit: gasLimitFn,
pendingState: state.ManageState(currentStateFn()), pendingState: state.ManageState(currentStateFn()),
events: eventMux.Subscribe(ChainEvent{}),
} }
go pool.eventLoop()
return pool
} }
func (pool *TxPool) Start() { func (pool *TxPool) eventLoop() {
// Track chain events. When a chain events occurs (new chain canon block) // Track chain events. When a chain events occurs (new chain canon block)
// we need to know the new state. The new state will help us determine // we need to know the new state. The new state will help us determine
// the nonces in the managed state // the nonces in the managed state
pool.events = pool.eventMux.Subscribe(ChainEvent{})
for _ = range pool.events.Chan() { for _ = range pool.events.Chan() {
pool.mu.Lock() pool.mu.Lock()
@ -100,7 +103,6 @@ func (pool *TxPool) resetState() {
} }
func (pool *TxPool) Stop() { func (pool *TxPool) Stop() {
pool.pending = make(map[common.Hash]*types.Transaction)
close(pool.quit) close(pool.quit)
pool.events.Unsubscribe() pool.events.Unsubscribe()
glog.V(logger.Info).Infoln("TX Pool stopped") glog.V(logger.Info).Infoln("TX Pool stopped")
@ -169,15 +171,10 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
return nil return nil
} }
// validate and queue transactions.
func (self *TxPool) add(tx *types.Transaction) error { func (self *TxPool) add(tx *types.Transaction) error {
hash := tx.Hash() hash := tx.Hash()
/* XXX I'm unsure about this. This is extremely dangerous and may result
in total black listing of certain transactions
if self.invalidHashes.Has(hash) {
return fmt.Errorf("Invalid transaction (%x)", hash[:4])
}
*/
if self.pending[hash] != nil { if self.pending[hash] != nil {
return fmt.Errorf("Known transaction (%x)", hash[:4]) return fmt.Errorf("Known transaction (%x)", hash[:4])
} }
@ -207,6 +204,30 @@ func (self *TxPool) add(tx *types.Transaction) error {
return nil return nil
} }
// queueTx will queue an unknown transaction
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
from, _ := tx.From() // already validated
if self.queue[from] == nil {
self.queue[from] = make(map[common.Hash]*types.Transaction)
}
self.queue[from][hash] = tx
}
// addTx will add a transaction to the pending (processable queue) list of transactions
func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
if _, ok := pool.pending[hash]; !ok {
pool.pending[hash] = tx
// Increment the nonce on the pending state. This can only happen if
// the nonce is +1 to the previous one.
pool.pendingState.SetNonce(addr, tx.AccountNonce+1)
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
go pool.eventMux.Post(TxPreEvent{tx})
}
}
// Add queues a single transaction in the pool if it is valid. // Add queues a single transaction in the pool if it is valid.
func (self *TxPool) Add(tx *types.Transaction) error { func (self *TxPool) Add(tx *types.Transaction) error {
self.mu.Lock() self.mu.Lock()
@ -290,28 +311,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
} }
} }
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
from, _ := tx.From() // already validated
if self.queue[from] == nil {
self.queue[from] = make(map[common.Hash]*types.Transaction)
}
self.queue[from][hash] = tx
}
func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
if _, ok := pool.pending[hash]; !ok {
pool.pending[hash] = tx
// Increment the nonce on the pending state. This can only happen if
// the nonce is +1 to the previous one.
pool.pendingState.SetNonce(addr, tx.AccountNonce+1)
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
go pool.eventMux.Post(TxPreEvent{tx})
}
}
// checkQueue moves transactions that have become processable to main pool. // checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() { func (pool *TxPool) checkQueue() {
state := pool.pendingState state := pool.pendingState

@ -466,8 +466,6 @@ func (s *Ethereum) Start() error {
s.StartAutoDAG() s.StartAutoDAG()
} }
// Start services
go s.txPool.Start()
s.protocolManager.Start() s.protocolManager.Start()
if s.whisper != nil { if s.whisper != nil {
@ -513,9 +511,6 @@ func (s *Ethereum) StartForTest() {
ClientString: s.net.Name, ClientString: s.net.Name,
ProtocolVersion: ProtocolVersion, ProtocolVersion: ProtocolVersion,
}) })
// Start services
s.txPool.Start()
} }
// AddPeer connects to the given node and maintains the connection until the // AddPeer connects to the given node and maintains the connection until the