From 7b08a70a236fa9fbac967d3a7a0a20a61be9c0fb Mon Sep 17 00:00:00 2001 From: Chris Li Date: Tue, 18 Jun 2024 15:20:23 +0800 Subject: [PATCH] perf: optimize chain commit performance for multi-database (#2509) --- core/blockchain.go | 52 ++++++++++++++++++++++++------------ core/genesis.go | 2 +- core/headerchain.go | 4 +-- core/rawdb/chain_iterator.go | 2 +- core/rawdb/database.go | 6 ++--- 5 files changed, 42 insertions(+), 24 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index ff4f66a47..f6f9a22c8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -301,6 +301,7 @@ type BlockChain struct { diffLayerFreezerBlockLimit uint64 wg sync.WaitGroup + dbWg sync.WaitGroup quit chan struct{} // shutdown signal, closed in Stop. stopping atomic.Bool // false if chain is running, true when stopped procInterrupt atomic.Bool // interrupt signaler for block processing @@ -669,7 +670,7 @@ func (bc *BlockChain) cacheBlock(hash common.Hash, block *types.Block) { // into node seamlessly. func (bc *BlockChain) empty() bool { genesis := bc.genesisBlock.Hash() - for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db.BlockStore()), rawdb.ReadHeadHeaderHash(bc.db.BlockStore()), rawdb.ReadHeadFastBlockHash(bc.db)} { + for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db.BlockStore()), rawdb.ReadHeadHeaderHash(bc.db.BlockStore()), rawdb.ReadHeadFastBlockHash(bc.db.BlockStore())} { if hash != genesis { return false } @@ -738,7 +739,7 @@ func (bc *BlockChain) loadLastState() error { bc.currentSnapBlock.Store(headBlock.Header()) headFastBlockGauge.Update(int64(headBlock.NumberU64())) - if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) { + if head := rawdb.ReadHeadFastBlockHash(bc.db.BlockStore()); head != (common.Hash{}) { if block := bc.GetBlockByHash(head); block != nil { bc.currentSnapBlock.Store(block.Header()) headFastBlockGauge.Update(int64(block.NumberU64())) @@ -1137,7 +1138,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // If SetHead was only called as a chain reparation method, try to skip // touching the header chain altogether, unless the freezer is broken if repair { - if target, force := updateFn(bc.db, bc.CurrentBlock()); force { + if target, force := updateFn(bc.db.BlockStore(), bc.CurrentBlock()); force { bc.hc.SetHead(target.Number.Uint64(), updateFn, delFn) } } else { @@ -1298,19 +1299,33 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { // // Note, this function assumes that the `mu` mutex is held! func (bc *BlockChain) writeHeadBlock(block *types.Block) { - // Add the block to the canonical chain number scheme and mark as the head - rawdb.WriteCanonicalHash(bc.db.BlockStore(), block.Hash(), block.NumberU64()) - rawdb.WriteHeadHeaderHash(bc.db.BlockStore(), block.Hash()) - rawdb.WriteHeadBlockHash(bc.db.BlockStore(), block.Hash()) + bc.dbWg.Add(2) + defer bc.dbWg.Wait() + go func() { + defer bc.dbWg.Done() + // Add the block to the canonical chain number scheme and mark as the head + blockBatch := bc.db.BlockStore().NewBatch() + rawdb.WriteCanonicalHash(blockBatch, block.Hash(), block.NumberU64()) + rawdb.WriteHeadHeaderHash(blockBatch, block.Hash()) + rawdb.WriteHeadBlockHash(blockBatch, block.Hash()) + rawdb.WriteHeadFastBlockHash(blockBatch, block.Hash()) + // Flush the whole batch into the disk, exit the node if failed + if err := blockBatch.Write(); err != nil { + log.Crit("Failed to update chain indexes and markers in block db", "err", err) + } + }() + go func() { + defer bc.dbWg.Done() - batch := bc.db.NewBatch() - rawdb.WriteHeadFastBlockHash(batch, block.Hash()) - rawdb.WriteTxLookupEntriesByBlock(batch, block) + batch := bc.db.NewBatch() + rawdb.WriteTxLookupEntriesByBlock(batch, block) + + // Flush the whole batch into the disk, exit the node if failed + if err := batch.Write(); err != nil { + log.Crit("Failed to update chain indexes in chain db", "err", err) + } + }() - // Flush the whole batch into the disk, exit the node if failed - if err := batch.Write(); err != nil { - log.Crit("Failed to update chain indexes and markers", "err", err) - } // Update all in-memory chain markers in the last step bc.hc.SetCurrentHeader(block.Header()) @@ -1531,7 +1546,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } else if !reorg { return false } - rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) + rawdb.WriteHeadFastBlockHash(bc.db.BlockStore(), head.Hash()) bc.currentSnapBlock.Store(head.Header()) headFastBlockGauge.Update(int64(head.NumberU64())) return true @@ -1774,7 +1789,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. wg := sync.WaitGroup{} wg.Add(1) go func() { - rawdb.WritePreimages(bc.db, state.Preimages()) blockBatch := bc.db.BlockStore().NewBatch() rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) rawdb.WriteBlock(blockBatch, block) @@ -1783,7 +1797,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. if bc.chainConfig.IsCancun(block.Number(), block.Time()) { rawdb.WriteBlobSidecars(blockBatch, block.Hash(), block.NumberU64(), block.Sidecars()) } - rawdb.WritePreimages(blockBatch, state.Preimages()) + if bc.db.StateStore() != nil { + rawdb.WritePreimages(bc.db.StateStore(), state.Preimages()) + } else { + rawdb.WritePreimages(blockBatch, state.Preimages()) + } if err := blockBatch.Write(); err != nil { log.Crit("Failed to write block into disk", "err", err) } diff --git a/core/genesis.go b/core/genesis.go index 986f2c79d..d6aae918f 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -498,7 +498,7 @@ func (g *Genesis) Commit(db ethdb.Database, triedb *triedb.Database) (*types.Blo rawdb.WriteReceipts(db.BlockStore(), block.Hash(), block.NumberU64(), nil) rawdb.WriteCanonicalHash(db.BlockStore(), block.Hash(), block.NumberU64()) rawdb.WriteHeadBlockHash(db.BlockStore(), block.Hash()) - rawdb.WriteHeadFastBlockHash(db, block.Hash()) + rawdb.WriteHeadFastBlockHash(db.BlockStore(), block.Hash()) rawdb.WriteHeadHeaderHash(db.BlockStore(), block.Hash()) rawdb.WriteChainConfig(db, block.Hash(), config) return block, nil diff --git a/core/headerchain.go b/core/headerchain.go index a48ef4619..f09ab4347 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -668,7 +668,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat // first then remove the relative data from the database. // // Update head first(head fast block, head full block) before deleting the data. - markerBatch := hc.chainDb.NewBatch() + markerBatch := hc.chainDb.BlockStore().NewBatch() if updateFn != nil { newHead, force := updateFn(markerBatch, parent) if force && ((headTime > 0 && newHead.Time < headTime) || (headTime == 0 && newHead.Number.Uint64() < headBlock)) { @@ -677,7 +677,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat } } // Update head header then. - rawdb.WriteHeadHeaderHash(hc.chainDb.BlockStore(), parentHash) + rawdb.WriteHeadHeaderHash(markerBatch, parentHash) if err := markerBatch.Write(); err != nil { log.Crit("Failed to update chain markers", "error", err) } diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index f78af9bb6..387040108 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -81,7 +81,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) { batch.Reset() WriteHeadHeaderHash(db.BlockStore(), hash) - WriteHeadFastBlockHash(db, hash) + WriteHeadFastBlockHash(db.BlockStore(), hash) log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start))) } diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 39dfcc511..ad4e3f81c 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -873,7 +873,7 @@ func DataTypeByKey(key []byte) DataType { return StateDataType } } - for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey} { + for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey, headBlockKey, headFastBlockKey} { if bytes.Equal(key, meta) { return BlockDataType } @@ -1088,7 +1088,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { hashNumPairings.Add(size) default: var accounted bool - for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey} { + for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey, headBlockKey, headFastBlockKey} { if bytes.Equal(key, meta) { metadata.Add(size) accounted = true @@ -1282,7 +1282,7 @@ func ReadChainMetadataFromMultiDatabase(db ethdb.Database) [][]string { data := [][]string{ {"databaseVersion", pp(ReadDatabaseVersion(db))}, {"headBlockHash", fmt.Sprintf("%v", ReadHeadBlockHash(db.BlockStore()))}, - {"headFastBlockHash", fmt.Sprintf("%v", ReadHeadFastBlockHash(db))}, + {"headFastBlockHash", fmt.Sprintf("%v", ReadHeadFastBlockHash(db.BlockStore()))}, {"headHeaderHash", fmt.Sprintf("%v", ReadHeadHeaderHash(db.BlockStore()))}, {"lastPivotNumber", pp(ReadLastPivotNumber(db))}, {"len(snapshotSyncStatus)", fmt.Sprintf("%d bytes", len(ReadSnapshotSyncStatus(db)))},