From 770316dc206618c97afaac9deceb1f9b878488ea Mon Sep 17 00:00:00 2001 From: gary rong Date: Fri, 17 Jan 2020 18:49:32 +0800 Subject: [PATCH] core, light: write chain data in atomic way (#20287) * core: write chain data in atomic way * core, light: address comments * core, light: fix linter * core, light: address comments --- core/blockchain.go | 140 ++++++++++++++++++++++++++------------------ core/headerchain.go | 85 ++++++++++++++++----------- light/lightchain.go | 21 +++++-- 3 files changed, 150 insertions(+), 96 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index ebab7a3ab..f083f53e0 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -407,6 +407,11 @@ func (bc *BlockChain) SetHead(head uint64) error { } } rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash()) + + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of SetHead is from high + // to low, so it's safe the update in-memory markers directly. bc.currentBlock.Store(newHeadBlock) headBlockGauge.Update(int64(newHeadBlock.NumberU64())) } @@ -419,6 +424,11 @@ func (bc *BlockChain) SetHead(head uint64) error { newHeadFastBlock = bc.genesisBlock } rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash()) + + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of SetHead is from high + // to low, so it's safe the update in-memory markers directly. bc.currentFastBlock.Store(newHeadFastBlock) headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64())) } @@ -538,21 +548,22 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain - if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { - log.Crit("Failed to write genesis block TD", "err", err) + batch := bc.db.NewBatch() + rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) + rawdb.WriteBlock(batch, genesis) + if err := batch.Write(); err != nil { + log.Crit("Failed to write genesis block", "err", err) } - rawdb.WriteBlock(bc.db, genesis) + bc.writeHeadBlock(genesis) + // Last update all in-memory chain markers bc.genesisBlock = genesis - bc.insert(bc.genesisBlock) bc.currentBlock.Store(bc.genesisBlock) headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - bc.hc.SetGenesis(bc.genesisBlock.Header()) bc.hc.SetCurrentHeader(bc.genesisBlock.Header()) bc.currentFastBlock.Store(bc.genesisBlock) headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - return nil } @@ -610,31 +621,39 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { return nil } -// insert injects a new head block into the current block chain. This method +// writeHeadBlock injects a new head block into the current block chain. This method // assumes that the block is indeed a true head. It will also reset the head // header and the head fast sync block to this very same block if they are older // or if they are on a different side chain. // // Note, this function assumes that the `mu` mutex is held! -func (bc *BlockChain) insert(block *types.Block) { +func (bc *BlockChain) writeHeadBlock(block *types.Block) { // If the block is on a side chain or an unknown one, force other heads onto it too updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash() // Add the block to the canonical chain number scheme and mark as the head - rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64()) - rawdb.WriteHeadBlockHash(bc.db, block.Hash()) - - bc.currentBlock.Store(block) - headBlockGauge.Update(int64(block.NumberU64())) + batch := bc.db.NewBatch() + rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()) + rawdb.WriteTxLookupEntries(batch, block) + rawdb.WriteHeadBlockHash(batch, block.Hash()) // If the block is better than our head or is on a different chain, force update heads + if updateHeads { + rawdb.WriteHeadHeaderHash(batch, block.Hash()) + rawdb.WriteHeadFastBlockHash(batch, block.Hash()) + } + // Flush the whole batch into the disk, exit the node if failed + if err := batch.Write(); err != nil { + log.Crit("Failed to update chain indexes and markers", "err", err) + } + // Update all in-memory chain markers in the last step if updateHeads { bc.hc.SetCurrentHeader(block.Header()) - rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()) - bc.currentFastBlock.Store(block) headFastBlockGauge.Update(int64(block.NumberU64())) } + bc.currentBlock.Store(block) + headBlockGauge.Update(int64(block.NumberU64())) } // Genesis retrieves the chain's genesis block. @@ -881,26 +900,36 @@ func (bc *BlockChain) Rollback(chain []common.Hash) { bc.chainmu.Lock() defer bc.chainmu.Unlock() + batch := bc.db.NewBatch() for i := len(chain) - 1; i >= 0; i-- { hash := chain[i] + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of rollback is from high + // to low, so it's safe the update in-memory markers directly. currentHeader := bc.hc.CurrentHeader() if currentHeader.Hash() == hash { - bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)) + newHeadHeader := bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1) + rawdb.WriteHeadHeaderHash(batch, currentHeader.ParentHash) + bc.hc.SetCurrentHeader(newHeadHeader) } if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash { newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1) - rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash()) + rawdb.WriteHeadFastBlockHash(batch, currentFastBlock.ParentHash()) bc.currentFastBlock.Store(newFastBlock) headFastBlockGauge.Update(int64(newFastBlock.NumberU64())) } if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash { newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1) - rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()) + rawdb.WriteHeadBlockHash(batch, currentBlock.ParentHash()) bc.currentBlock.Store(newBlock) headBlockGauge.Update(int64(newBlock.NumberU64())) } } + if err := batch.Write(); err != nil { + log.Crit("Failed to rollback chain markers", "err", err) + } // Truncate ancient data which exceeds the current header. // // Notably, it can happen that system crashes without truncating the ancient data @@ -1063,7 +1092,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Don't collect too much in-memory, write it out every 100K blocks if len(deleted) > 100000 { - // Sync the ancient store explicitly to ensure all data has been flushed to disk. if err := bc.db.Sync(); err != nil { return 0, err @@ -1172,7 +1200,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) rawdb.WriteTxLookupEntries(batch, block) - stats.processed++ + // Write everything belongs to the blocks into the database. So that + // we can ensure all components of body is completed(body, receipts, + // tx indexes) if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { return 0, err @@ -1180,7 +1210,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ size += batch.ValueSize() batch.Reset() } + stats.processed++ } + // Write everything belongs to the blocks into the database. So that + // we can ensure all components of body is completed(body, receipts, + // tx indexes) if batch.ValueSize() > 0 { size += batch.ValueSize() if err := batch.Write(); err != nil { @@ -1231,11 +1265,12 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e bc.wg.Add(1) defer bc.wg.Done() - if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil { - return err + batch := bc.db.NewBatch() + rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) + rawdb.WriteBlock(batch, block) + if err := batch.Write(); err != nil { + log.Crit("Failed to write block into disk", "err", err) } - rawdb.WriteBlock(bc.db, block) - return nil } @@ -1251,11 +1286,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { return err } } - // Write the positional metadata for transaction/receipt lookups. - // Preimages here is empty, ignore it. - rawdb.WriteTxLookupEntries(bc.db, block) - - bc.insert(block) + bc.writeHeadBlock(block) return nil } @@ -1283,12 +1314,19 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) externTd := new(big.Int).Add(block.Difficulty(), ptd) - // Irrelevant of the canonical status, write the block itself to the database - if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil { - return NonStatTy, err + // Irrelevant of the canonical status, write the block itself to the database. + // + // Note all the components of block(td, hash->number map, header, body, receipts) + // should be written atomically. BlockBatch is used for containing all components. + blockBatch := bc.db.NewBatch() + rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) + rawdb.WriteBlock(blockBatch, block) + rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) + rawdb.WritePreimages(blockBatch, state.Preimages()) + if err := blockBatch.Write(); err != nil { + log.Crit("Failed to write block into disk", "err", err) } - rawdb.WriteBlock(bc.db, block) - + // Commit all cached state changes into underlying memory database. root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) if err != nil { return NonStatTy, err @@ -1347,11 +1385,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } } } - - // Write other block data using a batch. - batch := bc.db.NewBatch() - rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) - // If the total difficulty is higher than our known, add it to the canonical chain // Second clause in the if statement reduces the vulnerability to selfish mining. // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf @@ -1377,21 +1410,13 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. return NonStatTy, err } } - // Write the positional metadata for transaction/receipt lookups and preimages - rawdb.WriteTxLookupEntries(batch, block) - rawdb.WritePreimages(batch, state.Preimages()) - status = CanonStatTy } else { status = SideStatTy } - if err := batch.Write(); err != nil { - return NonStatTy, err - } - // Set new head. if status == CanonStatTy { - bc.insert(block) + bc.writeHeadBlock(block) } bc.futureBlocks.Remove(block.Hash()) @@ -1991,20 +2016,19 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // taking care of the proper incremental order. for i := len(newChain) - 1; i >= 1; i-- { // Insert the block in the canonical way, re-writing history - bc.insert(newChain[i]) + bc.writeHeadBlock(newChain[i]) // Collect reborn logs due to chain reorg collectLogs(newChain[i].Hash(), false) - // Write lookup entries for hash based transaction/receipt searches - rawdb.WriteTxLookupEntries(bc.db, newChain[i]) + // Collect the new added transactions. addedTxs = append(addedTxs, newChain[i].Transactions()...) } - // When transactions get deleted from the database, the receipts that were - // created in the fork must also be deleted - batch := bc.db.NewBatch() + // Delete useless indexes right now which includes the non-canonical + // transaction indexes, canonical chain indexes which above the head. + indexesBatch := bc.db.NewBatch() for _, tx := range types.TxDifference(deletedTxs, addedTxs) { - rawdb.DeleteTxLookupEntry(batch, tx.Hash()) + rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash()) } // Delete any canonical number assignments above the new head number := bc.CurrentBlock().NumberU64() @@ -2013,9 +2037,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { if hash == (common.Hash{}) { break } - rawdb.DeleteCanonicalHash(batch, i) + rawdb.DeleteCanonicalHash(indexesBatch, i) + } + if err := indexesBatch.Write(); err != nil { + log.Crit("Failed to delete useless indexes", "err", err) } - batch.Write() // If any logs need to be fired, do it now. In theory we could avoid creating // this goroutine if there are no events to fire, but realistcally that only // ever happens if we're reorging empty blocks, which will only happen on idle diff --git a/core/headerchain.go b/core/headerchain.go index 4682069cf..f21dcf537 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -45,6 +45,14 @@ const ( // HeaderChain implements the basic block header chain logic that is shared by // core.BlockChain and light.LightChain. It is not usable in itself, only as // a part of either structure. +// +// HeaderChain is responsible for maintaining the header chain including the +// header query and updating. +// +// The components maintained by headerchain includes: (1) total difficult +// (2) header (3) block hash -> number mapping (4) canonical number -> hash mapping +// and (5) head header flag. +// // It is not thread safe either, the encapsulating chain structures should do // the necessary mutex locking/unlocking. type HeaderChain struct { @@ -66,10 +74,8 @@ type HeaderChain struct { engine consensus.Engine } -// NewHeaderChain creates a new HeaderChain structure. -// getValidator should return the parent's validator -// procInterrupt points to the parent's interrupt semaphore -// wg points to the parent's shutdown wait group +// NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points +// to the parent's interrupt semaphore. func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, procInterrupt func() bool) (*HeaderChain, error) { headerCache, _ := lru.New(headerCacheLimit) tdCache, _ := lru.New(tdCacheLimit) @@ -147,25 +153,33 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er externTd := new(big.Int).Add(header.Difficulty, ptd) // Irrelevant of the canonical status, write the td and header to the database - if err := hc.WriteTd(hash, number, externTd); err != nil { - log.Crit("Failed to write header total difficulty", "err", err) + // + // Note all the components of header(td, hash->number index and header) should + // be written atomically. + headerBatch := hc.chainDb.NewBatch() + rawdb.WriteTd(headerBatch, hash, number, externTd) + rawdb.WriteHeader(headerBatch, header) + if err := headerBatch.Write(); err != nil { + log.Crit("Failed to write header into disk", "err", err) } - rawdb.WriteHeader(hc.chainDb, header) - // If the total difficulty is higher than our known, add it to the canonical chain // Second clause in the if statement reduces the vulnerability to selfish mining. // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) { + // If the header can be added into canonical chain, adjust the + // header chain markers(canonical indexes and head header flag). + // + // Note all markers should be written atomically. + // Delete any canonical number assignments above the new head - batch := hc.chainDb.NewBatch() + markerBatch := hc.chainDb.NewBatch() for i := number + 1; ; i++ { hash := rawdb.ReadCanonicalHash(hc.chainDb, i) if hash == (common.Hash{}) { break } - rawdb.DeleteCanonicalHash(batch, i) + rawdb.DeleteCanonicalHash(markerBatch, i) } - batch.Write() // Overwrite any stale canonical number assignments var ( @@ -174,16 +188,19 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er headHeader = hc.GetHeader(headHash, headNumber) ) for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash { - rawdb.WriteCanonicalHash(hc.chainDb, headHash, headNumber) + rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber) headHash = headHeader.ParentHash headNumber = headHeader.Number.Uint64() - 1 headHeader = hc.GetHeader(headHash, headNumber) } // Extend the canonical chain with the new header - rawdb.WriteCanonicalHash(hc.chainDb, hash, number) - rawdb.WriteHeadHeaderHash(hc.chainDb, hash) - + rawdb.WriteCanonicalHash(markerBatch, hash, number) + rawdb.WriteHeadHeaderHash(markerBatch, hash) + if err := markerBatch.Write(); err != nil { + log.Crit("Failed to write header markers into disk", "err", err) + } + // Last step update all in-memory head header markers hc.currentHeaderHash = hash hc.currentHeader.Store(types.CopyHeader(header)) headHeaderGauge.Update(header.Number.Int64()) @@ -192,9 +209,9 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er } else { status = SideStatTy } + hc.tdCache.Add(hash, externTd) hc.headerCache.Add(hash, header) hc.numberCache.Add(hash, number) - return } @@ -396,14 +413,6 @@ func (hc *HeaderChain) GetTdByHash(hash common.Hash) *big.Int { return hc.GetTd(hash, *number) } -// WriteTd stores a block's total difficulty into the database, also caching it -// along the way. -func (hc *HeaderChain) WriteTd(hash common.Hash, number uint64, td *big.Int) error { - rawdb.WriteTd(hc.chainDb, hash, number, td) - hc.tdCache.Add(hash, new(big.Int).Set(td)) - return nil -} - // GetHeader retrieves a block header from the database by hash and number, // caching it if found. func (hc *HeaderChain) GetHeader(hash common.Hash, number uint64) *types.Header { @@ -431,6 +440,8 @@ func (hc *HeaderChain) GetHeaderByHash(hash common.Hash) *types.Header { } // HasHeader checks if a block header is present in the database or not. +// In theory, if header is present in the database, all relative components +// like td and hash->number should be present too. func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool { if hc.numberCache.Contains(hash) || hc.headerCache.Contains(hash) { return true @@ -458,10 +469,9 @@ func (hc *HeaderChain) CurrentHeader() *types.Header { return hc.currentHeader.Load().(*types.Header) } -// SetCurrentHeader sets the current head header of the canonical chain. +// SetCurrentHeader sets the in-memory head header marker of the canonical chan +// as the given header. func (hc *HeaderChain) SetCurrentHeader(head *types.Header) { - rawdb.WriteHeadHeaderHash(hc.chainDb, head.Hash()) - hc.currentHeader.Store(head) hc.currentHeaderHash = head.Hash() headHeaderGauge.Update(head.Number.Int64()) @@ -500,11 +510,18 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d // first then remove the relative data from the database. // // Update head first(head fast block, head full block) before deleting the data. + markerBatch := hc.chainDb.NewBatch() if updateFn != nil { - updateFn(hc.chainDb, parent) + updateFn(markerBatch, parent) } // Update head header then. - rawdb.WriteHeadHeaderHash(hc.chainDb, parentHash) + rawdb.WriteHeadHeaderHash(markerBatch, parentHash) + if err := markerBatch.Write(); err != nil { + log.Crit("Failed to update chain markers", "error", err) + } + hc.currentHeader.Store(parent) + hc.currentHeaderHash = parentHash + headHeaderGauge.Update(parent.Number.Int64()) // Remove the relative data from the database. if delFn != nil { @@ -514,13 +531,11 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d rawdb.DeleteHeader(batch, hash, num) rawdb.DeleteTd(batch, hash, num) rawdb.DeleteCanonicalHash(batch, num) - - hc.currentHeader.Store(parent) - hc.currentHeaderHash = parentHash - headHeaderGauge.Update(parent.Number.Int64()) } - batch.Write() - + // Flush all accumulated deletions. + if err := batch.Write(); err != nil { + log.Crit("Failed to rewind block", "error", err) + } // Clear out any stale content from the caches hc.headerCache.Purge() hc.tdCache.Purge() diff --git a/light/lightchain.go b/light/lightchain.go index 90ea26323..636e06f51 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -159,7 +159,6 @@ func (lc *LightChain) loadLastState() error { lc.hc.SetCurrentHeader(header) } } - // Issue a status log and return header := lc.hc.CurrentHeader() headerTd := lc.GetTd(header.Hash(), header.Number.Uint64()) @@ -198,9 +197,13 @@ func (lc *LightChain) ResetWithGenesisBlock(genesis *types.Block) { defer lc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain - rawdb.WriteTd(lc.chainDb, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) - rawdb.WriteBlock(lc.chainDb, genesis) - + batch := lc.chainDb.NewBatch() + rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) + rawdb.WriteBlock(batch, genesis) + rawdb.WriteHeadHeaderHash(batch, genesis.Hash()) + if err := batch.Write(); err != nil { + log.Crit("Failed to reset genesis block", "err", err) + } lc.genesisBlock = genesis lc.hc.SetGenesis(lc.genesisBlock.Header()) lc.hc.SetCurrentHeader(lc.genesisBlock.Header()) @@ -323,13 +326,22 @@ func (lc *LightChain) Rollback(chain []common.Hash) { lc.chainmu.Lock() defer lc.chainmu.Unlock() + batch := lc.chainDb.NewBatch() for i := len(chain) - 1; i >= 0; i-- { hash := chain[i] + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of rollback is from high + // to low, so it's safe the update in-memory markers directly. if head := lc.hc.CurrentHeader(); head.Hash() == hash { + rawdb.WriteHeadHeaderHash(batch, head.ParentHash) lc.hc.SetCurrentHeader(lc.GetHeader(head.ParentHash, head.Number.Uint64()-1)) } } + if err := batch.Write(); err != nil { + log.Crit("Failed to rollback light chain", "error", err) + } } // postChainEvents iterates over the events generated by a chain insertion and @@ -493,6 +505,7 @@ func (lc *LightChain) SyncCheckpoint(ctx context.Context, checkpoint *params.Tru // Ensure the chain didn't move past the latest block while retrieving it if lc.hc.CurrentHeader().Number.Uint64() < header.Number.Uint64() { log.Info("Updated latest header based on CHT", "number", header.Number, "hash", header.Hash(), "age", common.PrettyAge(time.Unix(int64(header.Time), 0))) + rawdb.WriteHeadHeaderHash(lc.chainDb, header.Hash()) lc.hc.SetCurrentHeader(header) } return true