Merge pull request #1669 from obscuren/tx-pool-auto-resend

core, xeth: chain reorg move missing transactions to transaction pool
This commit is contained in:
Jeffrey Wilcke 2015-09-21 11:45:59 -07:00
commit 7bf8e949e7
13 changed files with 231 additions and 57 deletions

@ -92,7 +92,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *eth
db, _ := ethdb.NewMemDatabase()
core.WriteGenesisBlockForTesting(db, common.HexToAddress(testAddress), common.String2Big(testBalance))
core.WriteGenesisBlockForTesting(db, core.GenesisAccount{common.HexToAddress(testAddress), common.String2Big(testBalance)})
ks := crypto.NewKeyStorePlain(filepath.Join(tmp, "keystore"))
am := accounts.NewManager(ks)
conf := &eth.Config{

@ -134,7 +134,7 @@ func testEth(t *testing.T) (ethereum *eth.Ethereum, err error) {
db, _ := ethdb.NewMemDatabase()
// set up mock genesis with balance on the testAddress
core.WriteGenesisBlockForTesting(db, common.HexToAddress(testAddress), common.String2Big(testBalance))
core.WriteGenesisBlockForTesting(db, core.GenesisAccount{common.HexToAddress(testAddress), common.String2Big(testBalance)})
// only use minimalistic stack with no networking
ethereum, err = eth.New(&eth.Config{

@ -162,7 +162,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
// Generate a chain of b.N blocks using the supplied block
// generator function.
genesis := WriteGenesisBlockForTesting(db, benchRootAddr, benchRootFunds)
genesis := WriteGenesisBlockForTesting(db, GenesisAccount{benchRootAddr, benchRootFunds})
chain := GenerateChain(genesis, db, b.N, gen)
// Time the insertion of the new chain.

@ -42,7 +42,7 @@ func ExampleGenerateChain() {
)
// Ensure that key1 has some funds in the genesis block.
genesis := WriteGenesisBlockForTesting(db, addr1, big.NewInt(1000000))
genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr1, big.NewInt(1000000)})
// This call generates a chain of 5 blocks. The function runs for
// each block and adds different features to gen based on the

@ -569,18 +569,17 @@ func (self *ChainManager) WriteBlock(block *types.Block) (status writeStatus, er
// chain fork
if block.ParentHash() != cblock.Hash() {
// during split we merge two different chains and create the new canonical chain
err := self.merge(cblock, block)
err := self.reorg(cblock, block)
if err != nil {
return NonStatTy, err
}
status = SplitStatTy
}
status = CanonStatTy
self.mu.Lock()
self.setTotalDifficulty(td)
self.insert(block)
self.mu.Unlock()
status = CanonStatTy
} else {
status = SideStatTy
}
@ -681,9 +680,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, err
}
if err := PutBlockReceipts(self.chainDb, block, receipts); err != nil {
glog.V(logger.Warn).Infoln("error writing block receipts:", err)
}
txcount += len(block.Transactions())
// write the block to the chain and get the status
status, err := self.WriteBlock(block)
if err != nil {
@ -711,10 +712,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
queue[i] = ChainSplitEvent{block, logs}
queueEvent.splitCount++
}
if err := PutBlockReceipts(self.chainDb, block, receipts); err != nil {
glog.V(logger.Warn).Infoln("error writing block receipts:", err)
}
stats.processed++
}
@ -729,20 +726,26 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return 0, nil
}
// diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
// to be part of the new canonical chain.
func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, error) {
// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
// to be part of the new canonical chain and accumulates potential missing transactions and post an
// event about them
func (self *ChainManager) reorg(oldBlock, newBlock *types.Block) error {
self.mu.Lock()
defer self.mu.Unlock()
var (
newChain types.Blocks
commonBlock *types.Block
oldStart = oldBlock
newStart = newBlock
deletedTxs types.Transactions
)
// first reduce whoever is higher bound
if oldBlock.NumberU64() > newBlock.NumberU64() {
// reduce old chain
for oldBlock = oldBlock; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) {
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
}
} else {
// reduce new chain and append new chain blocks for inserting later on
@ -751,10 +754,10 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e
}
}
if oldBlock == nil {
return nil, fmt.Errorf("Invalid old chain")
return fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
return nil, fmt.Errorf("Invalid new chain")
return fmt.Errorf("Invalid new chain")
}
numSplit := newBlock.Number()
@ -764,13 +767,14 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e
break
}
newChain = append(newChain, newBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash())
if oldBlock == nil {
return nil, fmt.Errorf("Invalid old chain")
return fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
return nil, fmt.Errorf("Invalid new chain")
return fmt.Errorf("Invalid new chain")
}
}
@ -779,18 +783,8 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e
glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
}
return newChain, nil
}
// merge merges two different chain to the new canonical chain
func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error {
newChain, err := self.diff(oldBlock, newBlock)
if err != nil {
return fmt.Errorf("chain reorg failed: %v", err)
}
var addedTxs types.Transactions
// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
self.mu.Lock()
for _, block := range newChain {
// insert the block in the canonical way, re-writing history
self.insert(block)
@ -798,8 +792,18 @@ func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error {
PutTransactions(self.chainDb, block, block.Transactions())
PutReceipts(self.chainDb, GetBlockReceipts(self.chainDb, block.Hash()))
addedTxs = append(addedTxs, block.Transactions()...)
}
self.mu.Unlock()
// calculate the difference between deleted and added transactions
diff := types.TxDifference(deletedTxs, addedTxs)
// When transactions get deleted from the database that means the
// receipts that were created in the fork must also be deleted
for _, tx := range diff {
DeleteReceipt(self.chainDb, tx.Hash())
DeleteTransaction(self.chainDb, tx.Hash())
}
self.eventMux.Post(RemovedTransactionEvent{diff})
return nil
}

@ -30,8 +30,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
"github.com/hashicorp/golang-lru"
@ -483,19 +485,115 @@ func TestInsertNonceError(t *testing.T) {
}
}
/*
func TestGenesisMismatch(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
var mux event.TypeMux
genesis := GenesisBlock(0, db)
_, err := NewChainManager(genesis, db, db, db, thePow(), &mux)
if err != nil {
t.Error(err)
// Tests that chain reorganizations handle transaction removals and reinsertions.
func TestChainTxReorgs(t *testing.T) {
params.MinGasLimit = big.NewInt(125000) // Minimum the gas limit may ever be.
params.GenesisGasLimit = big.NewInt(3141592) // Gas limit of the Genesis block.
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
key3, _ = crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
addr3 = crypto.PubkeyToAddress(key3.PublicKey)
db, _ = ethdb.NewMemDatabase()
)
genesis := WriteGenesisBlockForTesting(db,
GenesisAccount{addr1, big.NewInt(1000000)},
GenesisAccount{addr2, big.NewInt(1000000)},
GenesisAccount{addr3, big.NewInt(1000000)},
)
// Create two transactions shared between the chains:
// - postponed: transaction included at a later block in the forked chain
// - swapped: transaction included at the same block number in the forked chain
postponed, _ := types.NewTransaction(0, addr1, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key1)
swapped, _ := types.NewTransaction(1, addr1, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key1)
// Create two transactions that will be dropped by the forked chain:
// - pastDrop: transaction dropped retroactively from a past block
// - freshDrop: transaction dropped exactly at the block where the reorg is detected
var pastDrop, freshDrop *types.Transaction
// Create three transactions that will be added in the forked chain:
// - pastAdd: transaction added before the reorganiztion is detected
// - freshAdd: transaction added at the exact block the reorg is detected
// - futureAdd: transaction added after the reorg has already finished
var pastAdd, freshAdd, futureAdd *types.Transaction
chain := GenerateChain(genesis, db, 3, func(i int, gen *BlockGen) {
switch i {
case 0:
pastDrop, _ = types.NewTransaction(gen.TxNonce(addr2), addr2, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key2)
gen.AddTx(pastDrop) // This transaction will be dropped in the fork from below the split point
gen.AddTx(postponed) // This transaction will be postponed till block #3 in the fork
case 2:
freshDrop, _ = types.NewTransaction(gen.TxNonce(addr2), addr2, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key2)
gen.AddTx(freshDrop) // This transaction will be dropped in the fork from exactly at the split point
gen.AddTx(swapped) // This transaction will be swapped out at the exact height
gen.OffsetTime(9) // Lower the block difficulty to simulate a weaker chain
}
})
// Import the chain. This runs all block validation rules.
evmux := &event.TypeMux{}
chainman, _ := NewChainManager(db, FakePow{}, evmux)
chainman.SetProcessor(NewBlockProcessor(db, FakePow{}, chainman, evmux))
if i, err := chainman.InsertChain(chain); err != nil {
t.Fatalf("failed to insert original chain[%d]: %v", i, err)
}
genesis = GenesisBlock(1, db)
_, err = NewChainManager(genesis, db, db, db, thePow(), &mux)
if err == nil {
t.Error("expected genesis mismatch error")
// overwrite the old chain
chain = GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) {
switch i {
case 0:
pastAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3)
gen.AddTx(pastAdd) // This transaction needs to be injected during reorg
case 2:
gen.AddTx(postponed) // This transaction was postponed from block #1 in the original chain
gen.AddTx(swapped) // This transaction was swapped from the exact current spot in the original chain
freshAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3)
gen.AddTx(freshAdd) // This transaction will be added exactly at reorg time
case 3:
futureAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3)
gen.AddTx(futureAdd) // This transaction will be added after a full reorg
}
})
if _, err := chainman.InsertChain(chain); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
// removed tx
for i, tx := range (types.Transactions{pastDrop, freshDrop}) {
if GetTransaction(db, tx.Hash()) != nil {
t.Errorf("drop %d: tx found while shouldn't have been", i)
}
if GetReceipt(db, tx.Hash()) != nil {
t.Errorf("drop %d: receipt found while shouldn't have been", i)
}
}
// added tx
for i, tx := range (types.Transactions{pastAdd, freshAdd, futureAdd}) {
if GetTransaction(db, tx.Hash()) == nil {
t.Errorf("add %d: expected tx to be found", i)
}
if GetReceipt(db, tx.Hash()) == nil {
t.Errorf("add %d: expected receipt to be found", i)
}
}
// shared tx
for i, tx := range (types.Transactions{postponed, swapped}) {
if GetTransaction(db, tx.Hash()) == nil {
t.Errorf("share %d: expected tx to be found", i)
}
if GetReceipt(db, tx.Hash()) == nil {
t.Errorf("share %d: expected receipt to be found", i)
}
}
}
*/

@ -36,6 +36,9 @@ type NewBlockEvent struct{ Block *types.Block }
// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }
// RemovedTransactionEvent is posted when a reorg happens
type RemovedTransactionEvent struct{ Txs types.Transactions }
// ChainSplit is posted when a new head is detected
type ChainSplitEvent struct {
Block *types.Block

@ -125,15 +125,27 @@ func GenesisBlockForTesting(db ethdb.Database, addr common.Address, balance *big
return block
}
func WriteGenesisBlockForTesting(db ethdb.Database, addr common.Address, balance *big.Int) *types.Block {
type GenesisAccount struct {
Address common.Address
Balance *big.Int
}
func WriteGenesisBlockForTesting(db ethdb.Database, accounts ...GenesisAccount) *types.Block {
accountJson := "{"
for i, account := range accounts {
if i != 0 {
accountJson += ","
}
accountJson += fmt.Sprintf(`"0x%x":{"balance":"0x%x"}`, account.Address, account.Balance.Bytes())
}
accountJson += "}"
testGenesis := fmt.Sprintf(`{
"nonce":"0x%x",
"gasLimit":"0x%x",
"difficulty":"0x%x",
"alloc": {
"0x%x":{"balance":"0x%x"}
}
}`, types.EncodeNonce(0), params.GenesisGasLimit.Bytes(), params.GenesisDifficulty.Bytes(), addr, balance.Bytes())
"alloc": %s
}`, types.EncodeNonce(0), params.GenesisGasLimit.Bytes(), params.GenesisDifficulty.Bytes(), accountJson)
block, _ := WriteGenesisBlock(db, strings.NewReader(testGenesis))
return block
}

@ -81,7 +81,7 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
gasLimit: gasLimitFn,
minGasPrice: new(big.Int),
pendingState: state.ManageState(currentStateFn()),
events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}),
events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
}
go pool.eventLoop()
@ -93,16 +93,18 @@ func (pool *TxPool) eventLoop() {
// we need to know the new state. The new state will help us determine
// the nonces in the managed state
for ev := range pool.events.Chan() {
pool.mu.Lock()
switch ev := ev.(type) {
case ChainHeadEvent:
pool.mu.Lock()
pool.resetState()
pool.mu.Unlock()
case GasPriceChanged:
pool.mu.Lock()
pool.minGasPrice = ev.Price
pool.mu.Unlock()
case RemovedTransactionEvent:
pool.AddTransactions(ev.Txs)
}
pool.mu.Unlock()
}
}

@ -238,3 +238,15 @@ func TestNonceRecovery(t *testing.T) {
t.Errorf("expected nonce to be %d, got %d", n+1, fn)
}
}
func TestRemovedTxEvent(t *testing.T) {
pool, key := setupTxPool()
tx := transaction(0, big.NewInt(1000000), key)
from, _ := tx.From()
pool.currentState().AddBalance(from, big.NewInt(1000000000000))
pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}})
pool.eventMux.Post(ChainHeadEvent{nil})
if len(pool.pending) != 1 {
t.Error("expected 1 pending tx, got", len(pool.pending))
}
}

@ -77,6 +77,22 @@ func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactio
}
}
func DeleteTransaction(db ethdb.Database, txHash common.Hash) {
db.Delete(txHash[:])
}
func GetTransaction(db ethdb.Database, txhash common.Hash) *types.Transaction {
data, _ := db.Get(txhash[:])
if len(data) != 0 {
var tx types.Transaction
if err := rlp.DecodeBytes(data, &tx); err != nil {
return nil
}
return &tx
}
return nil
}
// PutReceipts stores the receipts in the current database
func PutReceipts(db ethdb.Database, receipts types.Receipts) error {
batch := new(leveldb.Batch)
@ -107,6 +123,11 @@ func PutReceipts(db ethdb.Database, receipts types.Receipts) error {
return nil
}
// Delete a receipts from the database
func DeleteReceipt(db ethdb.Database, txHash common.Hash) {
db.Delete(append(receiptsPre, txHash[:]...))
}
// GetReceipt returns a receipt by hash
func GetReceipt(db ethdb.Database, txHash common.Hash) *types.Receipt {
data, _ := db.Get(append(receiptsPre, txHash[:]...))

@ -272,14 +272,36 @@ func (tx *Transaction) String() string {
// Transaction slice type for basic sorting.
type Transactions []*Transaction
func (s Transactions) Len() int { return len(s) }
// Len returns the length of s
func (s Transactions) Len() int { return len(s) }
// Swap swaps the i'th and the j'th element in s
func (s Transactions) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// GetRlp implements Rlpable and returns the i'th element of s in rlp
func (s Transactions) GetRlp(i int) []byte {
enc, _ := rlp.EncodeToBytes(s[i])
return enc
}
// Returns a new set t which is the difference between a to b
func TxDifference(a, b Transactions) (keep Transactions) {
keep = make(Transactions, 0, len(a))
remove := make(map[common.Hash]struct{})
for _, tx := range b {
remove[tx.Hash()] = struct{}{}
}
for _, tx := range a {
if _, ok := remove[tx.Hash()]; !ok {
keep = append(keep, tx)
}
}
return keep
}
type TxByNonce struct{ Transactions }
func (s TxByNonce) Less(i, j int) bool {

@ -33,7 +33,7 @@ func newTestProtocolManager(blocks int, generator func(int, *core.BlockGen), new
evmux = new(event.TypeMux)
pow = new(core.FakePow)
db, _ = ethdb.NewMemDatabase()
genesis = core.WriteGenesisBlockForTesting(db, testBankAddress, testBankFunds)
genesis = core.WriteGenesisBlockForTesting(db, core.GenesisAccount{testBankAddress, testBankFunds})
chainman, _ = core.NewChainManager(db, pow, evmux)
blockproc = core.NewBlockProcessor(db, pow, chainman, evmux)
)