core: better side-chain importing

This commit is contained in:
Martin Holst Swende 2018-10-20 10:43:59 +02:00 committed by Péter Szilágyi
parent 3d997b6dec
commit 493903eede
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D
3 changed files with 274 additions and 97 deletions

@ -1048,6 +1048,80 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
return n, err
}
// addFutureBlock checks if the block is within the max allowed window to get accepted for future processing, and
// returns an error if the block is too far ahead and was not added.
func (bc *BlockChain) addFutureBlock(block *types.Block) error {
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time().Cmp(max) > 0 {
return fmt.Errorf("future block: %v > %v", block.Time(), max)
}
bc.futureBlocks.Add(block.Hash(), block)
return nil
}
// importBatch is a helper function to assist during chain import
type importBatch struct {
chain types.Blocks
results <-chan error
index int
validator Validator
}
// newBatch creates a new batch based on the given blocks, which are assumed to be a contiguous chain
func newBatch(chain types.Blocks, results <-chan error, validator Validator) *importBatch {
return &importBatch{
chain: chain,
results: results,
index: -1,
validator: validator,
}
}
// next returns the next block in the batch, along with any potential validation error for that block
// When the end is reached, it will return (nil, nil), but Current() will always return the last element.
func (batch *importBatch) next() (*types.Block, error) {
if batch.index+1 >= len(batch.chain) {
return nil, nil
}
batch.index++
if err := <-batch.results; err != nil {
return batch.chain[batch.index], err
}
return batch.chain[batch.index], batch.validator.ValidateBody(batch.chain[batch.index])
}
// current returns the current block that's being processed. Even after the next() has progressed the entire
// chain, current will always return the last element
func (batch *importBatch) current() *types.Block {
if batch.index < 0 {
return nil
}
return batch.chain[batch.index]
}
// previous returns the previous block was being processed, or nil
func (batch *importBatch) previous() *types.Block {
if batch.index < 1 {
return nil
}
return batch.chain[batch.index-1]
}
// first returns the first block in the batch
func (batch *importBatch) first() *types.Block {
return batch.chain[0]
}
// remaining returns the number of remaining blocks
func (batch *importBatch) remaining() int {
return len(batch.chain) - batch.index
}
// processed returns the number of processed blocks
func (batch *importBatch) processed() int {
return batch.index + 1
}
// insertChain will execute the actual chain insertion and event aggregation. The
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
@ -1067,12 +1141,27 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
}
}
log.Info("insertChain", "from", chain[0].NumberU64(), "to", chain[len(chain)-1].NumberU64())
// Pre-checks passed, start the full block imports
bc.wg.Add(1)
defer bc.wg.Done()
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
return bc.insertChainInternal(chain, true)
}
//insertChainInternal is the internal implementation of insertChain, which assumes that
// 1. chains are contiguous, and
// 2. The `chainMu` lock is held
// This method is split out so that import batches that require re-injecting historical blocks can do
// so without releasing the lock, which could lead to racey behaviour. If a sidechain import is in progress,
// and the historic state is imported, but then new canon-head is added before the actual sidechain completes,
// then the historic state could be pruned again
func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
@ -1082,6 +1171,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
events = make([]interface{}, 0, len(chain))
lastCanon *types.Block
coalescedLogs []*types.Log
block *types.Block
err error
)
// Start the parallel header verifier
headers := make([]*types.Header, len(chain))
@ -1089,16 +1180,57 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
for i, block := range chain {
headers[i] = block.Header()
seals[i] = true
seals[i] = verifySeals
}
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort)
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
// Peek the error for the first block
batch := newBatch(chain, results, bc.Validator())
if block, err = batch.next(); err != nil {
if err == consensus.ErrPrunedAncestor {
return bc.insertSidechainInternal(batch, err)
} else if err == consensus.ErrFutureBlock ||
(err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(batch.first().ParentHash())) {
// Iterate over the blocks and insert when the verifier permits
for i, block := range chain {
// The first block is a future block
// We can shove that one and any child blocks (that fail because of UnknownAncestor) into the future-queue
for block != nil && (batch.index == 0 || err == consensus.ErrUnknownAncestor) {
block := batch.current()
if futureError := bc.addFutureBlock(block); futureError != nil {
return batch.index, events, coalescedLogs, futureError
}
block, err = batch.next()
}
stats.queued += batch.processed()
stats.ignored += batch.remaining()
// If there are any still remaining, mark as ignored
return batch.index, events, coalescedLogs, err
} else if err == ErrKnownBlock {
// Block and state both already known -- there can be two explanations.
// 1. We did a roll-back, and should now do a re-import
// 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot
// from the canonical chain, which has not been verified.
// Skip all known blocks that are blocks behind us
currentNum := bc.CurrentBlock().NumberU64()
for block != nil && err == ErrKnownBlock && currentNum >= block.NumberU64() {
// We ignore these
stats.ignored++
block, err = batch.next()
}
// Falls through to the block import
} else {
// Some other error
stats.ignored += len(batch.chain)
bc.reportBlock(block, nil, err)
return batch.index, events, coalescedLogs, err
}
}
// No validation errors
for ; block != nil && err == nil; block, err = batch.next() {
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
@ -1107,115 +1239,45 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
// If the header is a banned one, straight out abort
if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBlacklistedHash)
return i, events, coalescedLogs, ErrBlacklistedHash
return batch.index, events, coalescedLogs, ErrBlacklistedHash
}
// Wait for the block's verification to complete
bstart := time.Now()
err := <-results
if err == nil {
err = bc.Validator().ValidateBody(block)
}
switch {
case err == ErrKnownBlock:
// Block and state both already known. However if the current block is below
// this number we did a rollback and we should reimport it nonetheless.
if bc.CurrentBlock().NumberU64() >= block.NumberU64() {
stats.ignored++
continue
}
case err == consensus.ErrFutureBlock:
// Allow up to MaxFuture second in the future blocks. If this limit is exceeded
// the chain is discarded and processed at a later time if given.
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time().Cmp(max) > 0 {
return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max)
}
bc.futureBlocks.Add(block.Hash(), block)
stats.queued++
continue
case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()):
bc.futureBlocks.Add(block.Hash(), block)
stats.queued++
continue
case err == consensus.ErrPrunedAncestor:
// Block competing with the canonical chain, store in the db, but don't process
// until the competitor TD goes above the canonical TD
currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
if localTd.Cmp(externTd) > 0 {
if err = bc.WriteBlockWithoutState(block, externTd); err != nil {
return i, events, coalescedLogs, err
}
continue
}
// Competitor chain beat canonical, gather all blocks from the common ancestor
var winner []*types.Block
parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
for !bc.HasState(parent.Root()) {
winner = append(winner, parent)
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
}
for j := 0; j < len(winner)/2; j++ {
winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j]
}
// Import all the pruned blocks to make the state available
bc.chainmu.Unlock()
_, evs, logs, err := bc.insertChain(winner)
bc.chainmu.Lock()
events, coalescedLogs = evs, logs
if err != nil {
return i, events, coalescedLogs, err
}
case err != nil:
bc.reportBlock(block, nil, err)
return i, events, coalescedLogs, err
}
// Create a new statedb using the parent block and report an
// error if it fails.
var parent *types.Block
if i == 0 {
parent = batch.previous()
if parent == nil {
parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
} else {
parent = chain[i-1]
}
state, err := state.New(parent.Root(), bc.stateCache)
if err != nil {
return i, events, coalescedLogs, err
return batch.index, events, coalescedLogs, err
}
// Process block using the parent state as reference point.
receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
return i, events, coalescedLogs, err
return batch.index, events, coalescedLogs, err
}
// Validate the state using the default validator
err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas)
if err != nil {
if err := bc.Validator().ValidateState(block, parent, state, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
return i, events, coalescedLogs, err
return batch.index, events, coalescedLogs, err
}
proctime := time.Since(bstart)
// Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(block, receipts, state)
if err != nil {
return i, events, coalescedLogs, err
return batch.index, events, coalescedLogs, err
}
switch status {
case CanonStatTy:
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(),
"uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
"elapsed", common.PrettyDuration(time.Since(bstart)),
"root", block.Root().String())
coalescedLogs = append(coalescedLogs, logs...)
blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block
@ -1223,23 +1285,138 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
bc.gcproc += proctime
case SideStatTy:
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))
blockInsertTimer.UpdateSince(bstart)
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(bstart)),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
"root", block.Root().String())
events = append(events, ChainSideEvent{block})
}
blockInsertTimer.UpdateSince(bstart)
stats.processed++
stats.usedGas += usedGas
cache, _ := bc.stateCache.TrieDB().Size()
stats.report(chain, i, cache)
stats.report(chain, batch.index, cache)
}
// Any blocks remaining here? If so, the only ones we need to care about are
// shoving future blocks into queue
if block != nil && err == consensus.ErrFutureBlock {
if futureErr := bc.addFutureBlock(block); futureErr != nil {
return batch.index, events, coalescedLogs, futureErr
}
for block, err = batch.next(); block != nil && err == consensus.ErrUnknownAncestor; {
if futureErr := bc.addFutureBlock(block); futureErr != nil {
return batch.index, events, coalescedLogs, futureErr
}
stats.queued++
block, err = batch.next()
}
}
stats.ignored += batch.remaining()
// Append a single chain head event if we've progressed the chain
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
events = append(events, ChainHeadEvent{lastCanon})
}
return 0, events, coalescedLogs, nil
return 0, events, coalescedLogs, err
}
// insertSidechainInternal should be called when an import batch hits upon a pruned ancestor error, which happens when
// an sidechain with a sufficiently old fork-block is found. It writes all (header-and-body-valid) blocks to disk, then
// tries to switch over to the new chain if the TD exceeded the current chain.
// It assumes that relevant locks are held already (hence 'Internal')
func (bc *BlockChain) insertSidechainInternal(batch *importBatch, err error) (int, []interface{}, []*types.Log, error) {
// If we're given a chain of blocks, and the first one is pruned, that means we're getting a
// sidechain imported. On the sidechain, we validate headers, but do not validate body and state
// (and actually import them) until the sidechain reaches a higher TD.
// Until then, we store them in the database (assuming that the header PoW check works out)
var (
externTd *big.Int
canonHeadNumber = bc.CurrentBlock().NumberU64()
events = make([]interface{}, 0)
coalescedLogs []*types.Log
)
// The first sidechain block error is already verified to be ErrPrunedAncestor. Since we don't import
// them here, we expect ErrUnknownAncestor for the remaining ones. Any other errors means that
// the block is invalid, and should not be written to disk.
block := batch.current()
for block != nil && (err == consensus.ErrPrunedAncestor) {
// Check the canonical stateroot for that number
if remoteNum := block.NumberU64(); canonHeadNumber >= remoteNum {
canonBlock := bc.GetBlockByNumber(remoteNum)
if canonBlock != nil && canonBlock.Root() == block.Root() {
// This is most likely a shadow-state attack.
// When a fork is imported into the database, and it eventually reaches a block height which is
// not pruned, we just found that the state already exist! This means that the sidechain block
// refers to a state which already exists in our canon chain.
// If left unchecked, we would now proceed importing the blocks, without actually having verified
// the state of the previous blocks.
log.Warn("Sidechain ghost-state attack detected", "blocknum", block.NumberU64(),
"sidechain root", block.Root(), "canon root", canonBlock.Root())
// If someone legitimately side-mines blocks, they would still be imported as usual. However,
// we cannot risk writing unverified blocks to disk when they obviously target the pruning
// mechanism.
return batch.index, events, coalescedLogs, fmt.Errorf("sidechain ghost-state attack detected")
}
}
if externTd == nil {
externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1)
}
externTd = new(big.Int).Add(externTd, block.Difficulty())
if !bc.HasBlock(block.Hash(), block.NumberU64()) {
if err := bc.WriteBlockWithoutState(block, externTd); err != nil {
return batch.index, events, coalescedLogs, err
}
}
block, err = batch.next()
}
// At this point, we've written all sidechain blocks to database. Loop ended either on some other error,
// or all were processed. If there was some other error, we can ignore the rest of those blocks.
//
// If the externTd was larger than our local TD, we now need to reimport the previous
// blocks to regenerate the required state
currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
// don't process until the competitor TD goes above the canonical TD
if localTd.Cmp(externTd) > 0 {
// If we have hit a sidechain, we may have to reimport pruned blocks
log.Info("Sidechain stored", "start", batch.first().NumberU64(), "end", batch.current().NumberU64(), "sidechain TD", externTd, "local TD", localTd)
return batch.index, events, coalescedLogs, err
}
// Competitor chain beat canonical. Before we reprocess to get the common ancestor, investigate if
// any blocks in the chain are 'known bad' blocks.
for index, b := range batch.chain {
if bc.badBlocks.Contains(b.Hash()) {
log.Info("Sidechain import aborted, bad block found", "index", index, "hash", b.Hash())
return index, events, coalescedLogs, fmt.Errorf("known bad block %d 0x%x", b.NumberU64(), b.Hash())
}
}
// gather all blocks from the common ancestor
var parents []*types.Block
// Import all the pruned blocks to make the state available
parent := bc.GetBlock(batch.first().ParentHash(), batch.first().NumberU64()-1)
for !bc.HasState(parent.Root()) {
if bc.badBlocks.Contains(parent.Hash()) {
log.Info("Sidechain parent processing aborted, bad block found", "number", parent.NumberU64(), "hash", parent.Hash())
return 0, events, coalescedLogs, fmt.Errorf("known bad block %d 0x%x", parent.NumberU64(), parent.Hash())
}
parents = append(parents, parent)
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
}
for j := 0; j < len(parents)/2; j++ {
parents[j], parents[len(parents)-1-j] = parents[len(parents)-1-j], parents[j]
}
// Import all the pruned blocks to make the state available
// During re-import, we can disable PoW-verification, since these are already verified
log.Info("Inserting parent blocks for reprocessing", "first", parents[0].NumberU64(), "count", len(parents), "last", parents[len(parents)-1].NumberU64)
_, evs, logs, err := bc.insertChainInternal(parents, false)
events, coalescedLogs = evs, logs
if err != nil {
return 0, events, coalescedLogs, err
}
log.Info("Inserting sidechain blocks for processing")
errindex, events, coalescedLogs, err := bc.insertChainInternal(batch.chain[0:batch.index], false)
return errindex, events, coalescedLogs, err
}
// insertStats tracks and reports on block insertion.

@ -287,7 +287,7 @@ func (n *Node) startInProc(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("InProc registered", "service", api.Service, "namespace", api.Namespace)
n.log.Debug("InProc registered", "namespace", api.Namespace)
}
n.inprocHandler = handler
return nil

@ -434,7 +434,7 @@ func (tab *Table) loadSeedNodes() {
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }}
log.Debug("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.add(seed)
}
}