Reworked chain handling process

* Forks
* Rename
* Moved inserting of blocks & processing
* Added chain testing method for validating pieces of a **a** chain.
This commit is contained in:
obscuren 2014-11-04 12:46:33 +01:00
parent f4b717cb9d
commit 699dcaf65c
4 changed files with 126 additions and 69 deletions

@ -313,22 +313,10 @@ out:
// If caught up and just a new block has been propagated: // If caught up and just a new block has been propagated:
// sm.eth.EventMux().Post(NewBlockEvent{block}) // sm.eth.EventMux().Post(NewBlockEvent{block})
// otherwise process and don't emit anything // otherwise process and don't emit anything
var err error if len(blocks) > 0 {
for i, block := range blocks { chainManager := self.eth.ChainManager()
err = self.eth.BlockManager().Process(block) chain := chain.NewChain(blocks)
if err != nil { _, err := chainManager.TestChain(chain)
poollogger.Infoln(err)
poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
poollogger.Debugln(block)
blocks = blocks[i:]
break
}
self.Remove(block.Hash())
}
if err != nil { if err != nil {
self.Reset() self.Reset()
@ -338,6 +326,12 @@ out:
self.peer.StopWithReason(DiscBadPeer) self.peer.StopWithReason(DiscBadPeer)
self.td = ethutil.Big0 self.td = ethutil.Big0
self.peer = nil self.peer = nil
} else {
chainManager.InsertChain(chain)
for _, block := range blocks {
self.Remove(block.Hash())
}
}
} }
} }
} }

@ -102,7 +102,7 @@ func (self *BlockManager) Stop() {
func (self *BlockManager) updateThread() { func (self *BlockManager) updateThread() {
for ev := range self.events.Chan() { for ev := range self.events.Chan() {
for _, block := range ev.(Blocks) { for _, block := range ev.(Blocks) {
err := self.Process(block) _, err := self.Process(block)
if err != nil { if err != nil {
statelogger.Infoln(err) statelogger.Infoln(err)
statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
@ -208,25 +208,27 @@ done:
return receipts, handled, unhandled, erroneous, err return receipts, handled, unhandled, erroneous, err
} }
func (sm *BlockManager) Process(block *Block) (err error) { func (sm *BlockManager) Process(block *Block) (td *big.Int, err error) {
// Processing a blocks may never happen simultaneously // Processing a blocks may never happen simultaneously
sm.mutex.Lock() sm.mutex.Lock()
defer sm.mutex.Unlock() defer sm.mutex.Unlock()
if sm.bc.HasBlock(block.Hash()) { if sm.bc.HasBlock(block.Hash()) {
return nil return nil, nil
} }
if !sm.bc.HasBlock(block.PrevHash) { if !sm.bc.HasBlock(block.PrevHash) {
return ParentError(block.PrevHash) return nil, ParentError(block.PrevHash)
}
parent := sm.bc.GetBlock(block.PrevHash)
return sm.ProcessWithParent(block, parent)
} }
func (sm *BlockManager) ProcessWithParent(block, parent *Block) (td *big.Int, err error) {
sm.lastAttemptedBlock = block sm.lastAttemptedBlock = block
var ( state := parent.State()
parent = sm.bc.GetBlock(block.PrevHash)
state = parent.State()
)
// Defer the Undo on the Trie. If the block processing happened // Defer the Undo on the Trie. If the block processing happened
// we don't want to undo but since undo only happens on dirty // we don't want to undo but since undo only happens on dirty
@ -240,32 +242,32 @@ func (sm *BlockManager) Process(block *Block) (err error) {
txSha := DeriveSha(block.transactions) txSha := DeriveSha(block.transactions)
if bytes.Compare(txSha, block.TxSha) != 0 { if bytes.Compare(txSha, block.TxSha) != 0 {
return fmt.Errorf("Error validating transaction sha. Received %x, got %x", block.TxSha, txSha) return nil, fmt.Errorf("Error validating transaction sha. Received %x, got %x", block.TxSha, txSha)
} }
receipts, err := sm.ApplyDiff(state, parent, block) receipts, err := sm.ApplyDiff(state, parent, block)
if err != nil { if err != nil {
return err return nil, err
} }
receiptSha := DeriveSha(receipts) receiptSha := DeriveSha(receipts)
if bytes.Compare(receiptSha, block.ReceiptSha) != 0 { if bytes.Compare(receiptSha, block.ReceiptSha) != 0 {
return fmt.Errorf("Error validating receipt sha. Received %x, got %x", block.ReceiptSha, receiptSha) return nil, fmt.Errorf("Error validating receipt sha. Received %x, got %x", block.ReceiptSha, receiptSha)
} }
// Block validation // Block validation
if err = sm.ValidateBlock(block); err != nil { if err = sm.ValidateBlock(block, parent); err != nil {
statelogger.Errorln("Error validating block:", err) statelogger.Errorln("Error validating block:", err)
return err return nil, err
} }
if err = sm.AccumelateRewards(state, block, parent); err != nil { if err = sm.AccumelateRewards(state, block, parent); err != nil {
statelogger.Errorln("Error accumulating reward", err) statelogger.Errorln("Error accumulating reward", err)
return err return nil, err
} }
if bytes.Compare(CreateBloom(block), block.LogsBloom) != 0 { if bytes.Compare(CreateBloom(block), block.LogsBloom) != 0 {
return errors.New("Unable to replicate block's bloom") return nil, errors.New("Unable to replicate block's bloom")
} }
state.Update() state.Update()
@ -276,27 +278,22 @@ func (sm *BlockManager) Process(block *Block) (err error) {
} }
// Calculate the new total difficulty and sync back to the db // Calculate the new total difficulty and sync back to the db
if sm.CalculateTD(block) { if td, ok := sm.CalculateTD(block); ok {
// Sync the current block's state to the database and cancelling out the deferred Undo // Sync the current block's state to the database and cancelling out the deferred Undo
state.Sync() state.Sync()
// Add the block to the chain
sm.bc.Add(block)
// TODO at this point we should also insert LOGS in to a database // TODO at this point we should also insert LOGS in to a database
sm.transState = state.Copy() sm.transState = state.Copy()
statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4])
state.Manifest().Reset() state.Manifest().Reset()
sm.eth.TxPool().RemoveSet(block.Transactions()) sm.eth.TxPool().RemoveSet(block.Transactions())
} else {
statelogger.Errorln("total diff failed")
}
return nil return td, nil
} else {
return nil, errors.New("total diff failed")
}
} }
func (sm *BlockManager) ApplyDiff(state *state.State, parent, block *Block) (receipts Receipts, err error) { func (sm *BlockManager) ApplyDiff(state *state.State, parent, block *Block) (receipts Receipts, err error) {
@ -312,7 +309,7 @@ func (sm *BlockManager) ApplyDiff(state *state.State, parent, block *Block) (rec
return receipts, nil return receipts, nil
} }
func (sm *BlockManager) CalculateTD(block *Block) bool { func (sm *BlockManager) CalculateTD(block *Block) (*big.Int, bool) {
uncleDiff := new(big.Int) uncleDiff := new(big.Int)
for _, uncle := range block.Uncles { for _, uncle := range block.Uncles {
uncleDiff = uncleDiff.Add(uncleDiff, uncle.Difficulty) uncleDiff = uncleDiff.Add(uncleDiff, uncle.Difficulty)
@ -326,30 +323,19 @@ func (sm *BlockManager) CalculateTD(block *Block) bool {
// The new TD will only be accepted if the new difficulty is // The new TD will only be accepted if the new difficulty is
// is greater than the previous. // is greater than the previous.
if td.Cmp(sm.bc.TD) > 0 { if td.Cmp(sm.bc.TD) > 0 {
// Set the new total difficulty back to the block chain return td, true
sm.bc.SetTotalDifficulty(td)
return true // Set the new total difficulty back to the block chain
//sm.bc.SetTotalDifficulty(td)
} }
return false return nil, false
} }
// Validates the current block. Returns an error if the block was invalid, // Validates the current block. Returns an error if the block was invalid,
// an uncle or anything that isn't on the current block chain. // an uncle or anything that isn't on the current block chain.
// Validation validates easy over difficult (dagger takes longer time = difficult) // Validation validates easy over difficult (dagger takes longer time = difficult)
func (sm *BlockManager) ValidateBlock(block *Block) error { func (sm *BlockManager) ValidateBlock(block, parent *Block) error {
// Check each uncle's previous hash. In order for it to be valid
// is if it has the same block hash as the current
parent := sm.bc.GetBlock(block.PrevHash)
/*
for _, uncle := range block.Uncles {
if bytes.Compare(uncle.PrevHash,parent.PrevHash) != 0 {
return ValidationError("Mismatch uncle's previous hash. Expected %x, got %x",parent.PrevHash, uncle.PrevHash)
}
}
*/
expd := CalcDifficulty(block, parent) expd := CalcDifficulty(block, parent)
if expd.Cmp(block.Difficulty) < 0 { if expd.Cmp(block.Difficulty) < 0 {
return fmt.Errorf("Difficulty check failed for block %v, %v", block.Difficulty, expd) return fmt.Errorf("Difficulty check failed for block %v, %v", block.Difficulty, expd)

@ -2,6 +2,7 @@ package chain
import ( import (
"bytes" "bytes"
"container/list"
"fmt" "fmt"
"math/big" "math/big"
@ -86,7 +87,7 @@ func (bc *ChainManager) Reset() {
bc.genesisBlock.state.Trie.Sync() bc.genesisBlock.state.Trie.Sync()
// Prepare the genesis block // Prepare the genesis block
bc.Add(bc.genesisBlock) bc.add(bc.genesisBlock)
bc.CurrentBlock = bc.genesisBlock bc.CurrentBlock = bc.genesisBlock
bc.SetTotalDifficulty(ethutil.Big("0")) bc.SetTotalDifficulty(ethutil.Big("0"))
@ -191,9 +192,8 @@ func (bc *ChainManager) SetTotalDifficulty(td *big.Int) {
} }
// Add a block to the chain and record addition information // Add a block to the chain and record addition information
func (bc *ChainManager) Add(block *Block) { func (bc *ChainManager) add(block *Block) {
bc.writeBlockInfo(block) bc.writeBlockInfo(block)
// Prepare the genesis block
bc.CurrentBlock = block bc.CurrentBlock = block
bc.LastBlockHash = block.Hash() bc.LastBlockHash = block.Hash()
@ -201,6 +201,8 @@ func (bc *ChainManager) Add(block *Block) {
encodedBlock := block.RlpEncode() encodedBlock := block.RlpEncode()
ethutil.Config.Db.Put(block.Hash(), encodedBlock) ethutil.Config.Db.Put(block.Hash(), encodedBlock)
ethutil.Config.Db.Put([]byte("LastBlock"), encodedBlock) ethutil.Config.Db.Put([]byte("LastBlock"), encodedBlock)
chainlogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4])
} }
func (self *ChainManager) CalcTotalDiff(block *Block) (*big.Int, error) { func (self *ChainManager) CalcTotalDiff(block *Block) (*big.Int, error) {
@ -287,3 +289,75 @@ func (bc *ChainManager) Stop() {
chainlogger.Infoln("Stopped") chainlogger.Infoln("Stopped")
} }
} }
type link struct {
block *Block
td *big.Int
}
type BlockChain struct {
*list.List
}
func NewChain(blocks Blocks) *BlockChain {
chain := &BlockChain{list.New()}
for _, block := range blocks {
chain.PushBack(&link{block, nil})
}
return chain
}
// This function assumes you've done your checking. No checking is done at this stage anymore
func (self *ChainManager) InsertChain(chain *BlockChain) {
for e := chain.Front(); e != nil; e = e.Next() {
link := e.Value.(*link)
self.SetTotalDifficulty(link.td)
self.add(link.block)
}
}
func (self *ChainManager) TestChain(chain *BlockChain) (i int, err error) {
var (
td *big.Int
)
for e := chain.Front(); e != nil; e = e.Next() {
var (
l = e.Value.(*link)
block = l.block
parent *Block
prev = e.Prev()
)
if prev == nil {
parent = self.GetBlock(block.PrevHash)
} else {
parent = prev.Value.(*link).block
}
if parent == nil {
err = fmt.Errorf("incoming chain broken on hash %x\n", block.PrevHash[0:4])
return
}
td, err = self.Ethereum.BlockManager().ProcessWithParent(block, parent)
if err != nil {
chainlogger.Infoln(err)
chainlogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
chainlogger.Debugln(block)
err = fmt.Errorf("incoming chain failed %v\n", err)
return
}
l.td = td
i++
}
if td.Cmp(self.TD) <= 0 {
err = fmt.Errorf("incoming chain has a lower or equal TD (%v <= %v)", td, self.TD)
return
}
return i, nil
}

@ -162,8 +162,9 @@ func (miner *Miner) stopMining() {
func (self *Miner) mineNewBlock() { func (self *Miner) mineNewBlock() {
blockManager := self.ethereum.BlockManager() blockManager := self.ethereum.BlockManager()
chainMan := self.ethereum.ChainManager()
self.block = self.ethereum.ChainManager().NewBlock(self.coinbase) self.block = chainMan.NewBlock(self.coinbase)
// Apply uncles // Apply uncles
if len(self.uncles) > 0 { if len(self.uncles) > 0 {
@ -199,10 +200,12 @@ func (self *Miner) mineNewBlock() {
nonce := self.pow.Search(self.block, self.powQuitChan) nonce := self.pow.Search(self.block, self.powQuitChan)
if nonce != nil { if nonce != nil {
self.block.Nonce = nonce self.block.Nonce = nonce
err := self.ethereum.BlockManager().Process(self.block) lchain := chain.NewChain(chain.Blocks{self.block})
_, err := chainMan.TestChain(lchain)
if err != nil { if err != nil {
minerlogger.Infoln(err) minerlogger.Infoln(err)
} else { } else {
self.ethereum.ChainManager().InsertChain(lchain)
self.ethereum.Broadcast(wire.MsgBlockTy, []interface{}{self.block.Value().Val}) self.ethereum.Broadcast(wire.MsgBlockTy, []interface{}{self.block.Value().Val})
minerlogger.Infof("🔨 Mined block %x\n", self.block.Hash()) minerlogger.Infof("🔨 Mined block %x\n", self.block.Hash())
minerlogger.Infoln(self.block) minerlogger.Infoln(self.block)