perf: optimize chain commit performance for multi-database (#2509)
This commit is contained in:
parent
99d31aeb28
commit
7b08a70a23
@ -301,6 +301,7 @@ type BlockChain struct {
|
|||||||
diffLayerFreezerBlockLimit uint64
|
diffLayerFreezerBlockLimit uint64
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
dbWg sync.WaitGroup
|
||||||
quit chan struct{} // shutdown signal, closed in Stop.
|
quit chan struct{} // shutdown signal, closed in Stop.
|
||||||
stopping atomic.Bool // false if chain is running, true when stopped
|
stopping atomic.Bool // false if chain is running, true when stopped
|
||||||
procInterrupt atomic.Bool // interrupt signaler for block processing
|
procInterrupt atomic.Bool // interrupt signaler for block processing
|
||||||
@ -669,7 +670,7 @@ func (bc *BlockChain) cacheBlock(hash common.Hash, block *types.Block) {
|
|||||||
// into node seamlessly.
|
// into node seamlessly.
|
||||||
func (bc *BlockChain) empty() bool {
|
func (bc *BlockChain) empty() bool {
|
||||||
genesis := bc.genesisBlock.Hash()
|
genesis := bc.genesisBlock.Hash()
|
||||||
for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db.BlockStore()), rawdb.ReadHeadHeaderHash(bc.db.BlockStore()), rawdb.ReadHeadFastBlockHash(bc.db)} {
|
for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db.BlockStore()), rawdb.ReadHeadHeaderHash(bc.db.BlockStore()), rawdb.ReadHeadFastBlockHash(bc.db.BlockStore())} {
|
||||||
if hash != genesis {
|
if hash != genesis {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -738,7 +739,7 @@ func (bc *BlockChain) loadLastState() error {
|
|||||||
bc.currentSnapBlock.Store(headBlock.Header())
|
bc.currentSnapBlock.Store(headBlock.Header())
|
||||||
headFastBlockGauge.Update(int64(headBlock.NumberU64()))
|
headFastBlockGauge.Update(int64(headBlock.NumberU64()))
|
||||||
|
|
||||||
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
|
if head := rawdb.ReadHeadFastBlockHash(bc.db.BlockStore()); head != (common.Hash{}) {
|
||||||
if block := bc.GetBlockByHash(head); block != nil {
|
if block := bc.GetBlockByHash(head); block != nil {
|
||||||
bc.currentSnapBlock.Store(block.Header())
|
bc.currentSnapBlock.Store(block.Header())
|
||||||
headFastBlockGauge.Update(int64(block.NumberU64()))
|
headFastBlockGauge.Update(int64(block.NumberU64()))
|
||||||
@ -1137,7 +1138,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
|
|||||||
// If SetHead was only called as a chain reparation method, try to skip
|
// If SetHead was only called as a chain reparation method, try to skip
|
||||||
// touching the header chain altogether, unless the freezer is broken
|
// touching the header chain altogether, unless the freezer is broken
|
||||||
if repair {
|
if repair {
|
||||||
if target, force := updateFn(bc.db, bc.CurrentBlock()); force {
|
if target, force := updateFn(bc.db.BlockStore(), bc.CurrentBlock()); force {
|
||||||
bc.hc.SetHead(target.Number.Uint64(), updateFn, delFn)
|
bc.hc.SetHead(target.Number.Uint64(), updateFn, delFn)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -1298,19 +1299,33 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
|
|||||||
//
|
//
|
||||||
// Note, this function assumes that the `mu` mutex is held!
|
// Note, this function assumes that the `mu` mutex is held!
|
||||||
func (bc *BlockChain) writeHeadBlock(block *types.Block) {
|
func (bc *BlockChain) writeHeadBlock(block *types.Block) {
|
||||||
|
bc.dbWg.Add(2)
|
||||||
|
defer bc.dbWg.Wait()
|
||||||
|
go func() {
|
||||||
|
defer bc.dbWg.Done()
|
||||||
// Add the block to the canonical chain number scheme and mark as the head
|
// Add the block to the canonical chain number scheme and mark as the head
|
||||||
rawdb.WriteCanonicalHash(bc.db.BlockStore(), block.Hash(), block.NumberU64())
|
blockBatch := bc.db.BlockStore().NewBatch()
|
||||||
rawdb.WriteHeadHeaderHash(bc.db.BlockStore(), block.Hash())
|
rawdb.WriteCanonicalHash(blockBatch, block.Hash(), block.NumberU64())
|
||||||
rawdb.WriteHeadBlockHash(bc.db.BlockStore(), block.Hash())
|
rawdb.WriteHeadHeaderHash(blockBatch, block.Hash())
|
||||||
|
rawdb.WriteHeadBlockHash(blockBatch, block.Hash())
|
||||||
|
rawdb.WriteHeadFastBlockHash(blockBatch, block.Hash())
|
||||||
|
// Flush the whole batch into the disk, exit the node if failed
|
||||||
|
if err := blockBatch.Write(); err != nil {
|
||||||
|
log.Crit("Failed to update chain indexes and markers in block db", "err", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
defer bc.dbWg.Done()
|
||||||
|
|
||||||
batch := bc.db.NewBatch()
|
batch := bc.db.NewBatch()
|
||||||
rawdb.WriteHeadFastBlockHash(batch, block.Hash())
|
|
||||||
rawdb.WriteTxLookupEntriesByBlock(batch, block)
|
rawdb.WriteTxLookupEntriesByBlock(batch, block)
|
||||||
|
|
||||||
// Flush the whole batch into the disk, exit the node if failed
|
// Flush the whole batch into the disk, exit the node if failed
|
||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
log.Crit("Failed to update chain indexes and markers", "err", err)
|
log.Crit("Failed to update chain indexes in chain db", "err", err)
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Update all in-memory chain markers in the last step
|
// Update all in-memory chain markers in the last step
|
||||||
bc.hc.SetCurrentHeader(block.Header())
|
bc.hc.SetCurrentHeader(block.Header())
|
||||||
|
|
||||||
@ -1531,7 +1546,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
|
|||||||
} else if !reorg {
|
} else if !reorg {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
|
rawdb.WriteHeadFastBlockHash(bc.db.BlockStore(), head.Hash())
|
||||||
bc.currentSnapBlock.Store(head.Header())
|
bc.currentSnapBlock.Store(head.Header())
|
||||||
headFastBlockGauge.Update(int64(head.NumberU64()))
|
headFastBlockGauge.Update(int64(head.NumberU64()))
|
||||||
return true
|
return true
|
||||||
@ -1774,7 +1789,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
rawdb.WritePreimages(bc.db, state.Preimages())
|
|
||||||
blockBatch := bc.db.BlockStore().NewBatch()
|
blockBatch := bc.db.BlockStore().NewBatch()
|
||||||
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
|
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
|
||||||
rawdb.WriteBlock(blockBatch, block)
|
rawdb.WriteBlock(blockBatch, block)
|
||||||
@ -1783,7 +1797,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||||||
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
|
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
|
||||||
rawdb.WriteBlobSidecars(blockBatch, block.Hash(), block.NumberU64(), block.Sidecars())
|
rawdb.WriteBlobSidecars(blockBatch, block.Hash(), block.NumberU64(), block.Sidecars())
|
||||||
}
|
}
|
||||||
|
if bc.db.StateStore() != nil {
|
||||||
|
rawdb.WritePreimages(bc.db.StateStore(), state.Preimages())
|
||||||
|
} else {
|
||||||
rawdb.WritePreimages(blockBatch, state.Preimages())
|
rawdb.WritePreimages(blockBatch, state.Preimages())
|
||||||
|
}
|
||||||
if err := blockBatch.Write(); err != nil {
|
if err := blockBatch.Write(); err != nil {
|
||||||
log.Crit("Failed to write block into disk", "err", err)
|
log.Crit("Failed to write block into disk", "err", err)
|
||||||
}
|
}
|
||||||
|
@ -498,7 +498,7 @@ func (g *Genesis) Commit(db ethdb.Database, triedb *triedb.Database) (*types.Blo
|
|||||||
rawdb.WriteReceipts(db.BlockStore(), block.Hash(), block.NumberU64(), nil)
|
rawdb.WriteReceipts(db.BlockStore(), block.Hash(), block.NumberU64(), nil)
|
||||||
rawdb.WriteCanonicalHash(db.BlockStore(), block.Hash(), block.NumberU64())
|
rawdb.WriteCanonicalHash(db.BlockStore(), block.Hash(), block.NumberU64())
|
||||||
rawdb.WriteHeadBlockHash(db.BlockStore(), block.Hash())
|
rawdb.WriteHeadBlockHash(db.BlockStore(), block.Hash())
|
||||||
rawdb.WriteHeadFastBlockHash(db, block.Hash())
|
rawdb.WriteHeadFastBlockHash(db.BlockStore(), block.Hash())
|
||||||
rawdb.WriteHeadHeaderHash(db.BlockStore(), block.Hash())
|
rawdb.WriteHeadHeaderHash(db.BlockStore(), block.Hash())
|
||||||
rawdb.WriteChainConfig(db, block.Hash(), config)
|
rawdb.WriteChainConfig(db, block.Hash(), config)
|
||||||
return block, nil
|
return block, nil
|
||||||
|
@ -668,7 +668,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat
|
|||||||
// first then remove the relative data from the database.
|
// first then remove the relative data from the database.
|
||||||
//
|
//
|
||||||
// Update head first(head fast block, head full block) before deleting the data.
|
// Update head first(head fast block, head full block) before deleting the data.
|
||||||
markerBatch := hc.chainDb.NewBatch()
|
markerBatch := hc.chainDb.BlockStore().NewBatch()
|
||||||
if updateFn != nil {
|
if updateFn != nil {
|
||||||
newHead, force := updateFn(markerBatch, parent)
|
newHead, force := updateFn(markerBatch, parent)
|
||||||
if force && ((headTime > 0 && newHead.Time < headTime) || (headTime == 0 && newHead.Number.Uint64() < headBlock)) {
|
if force && ((headTime > 0 && newHead.Time < headTime) || (headTime == 0 && newHead.Number.Uint64() < headBlock)) {
|
||||||
@ -677,7 +677,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Update head header then.
|
// Update head header then.
|
||||||
rawdb.WriteHeadHeaderHash(hc.chainDb.BlockStore(), parentHash)
|
rawdb.WriteHeadHeaderHash(markerBatch, parentHash)
|
||||||
if err := markerBatch.Write(); err != nil {
|
if err := markerBatch.Write(); err != nil {
|
||||||
log.Crit("Failed to update chain markers", "error", err)
|
log.Crit("Failed to update chain markers", "error", err)
|
||||||
}
|
}
|
||||||
|
@ -81,7 +81,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) {
|
|||||||
batch.Reset()
|
batch.Reset()
|
||||||
|
|
||||||
WriteHeadHeaderHash(db.BlockStore(), hash)
|
WriteHeadHeaderHash(db.BlockStore(), hash)
|
||||||
WriteHeadFastBlockHash(db, hash)
|
WriteHeadFastBlockHash(db.BlockStore(), hash)
|
||||||
log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start)))
|
log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -873,7 +873,7 @@ func DataTypeByKey(key []byte) DataType {
|
|||||||
return StateDataType
|
return StateDataType
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey} {
|
for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey, headBlockKey, headFastBlockKey} {
|
||||||
if bytes.Equal(key, meta) {
|
if bytes.Equal(key, meta) {
|
||||||
return BlockDataType
|
return BlockDataType
|
||||||
}
|
}
|
||||||
@ -1088,7 +1088,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||||||
hashNumPairings.Add(size)
|
hashNumPairings.Add(size)
|
||||||
default:
|
default:
|
||||||
var accounted bool
|
var accounted bool
|
||||||
for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey} {
|
for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey, headBlockKey, headFastBlockKey} {
|
||||||
if bytes.Equal(key, meta) {
|
if bytes.Equal(key, meta) {
|
||||||
metadata.Add(size)
|
metadata.Add(size)
|
||||||
accounted = true
|
accounted = true
|
||||||
@ -1282,7 +1282,7 @@ func ReadChainMetadataFromMultiDatabase(db ethdb.Database) [][]string {
|
|||||||
data := [][]string{
|
data := [][]string{
|
||||||
{"databaseVersion", pp(ReadDatabaseVersion(db))},
|
{"databaseVersion", pp(ReadDatabaseVersion(db))},
|
||||||
{"headBlockHash", fmt.Sprintf("%v", ReadHeadBlockHash(db.BlockStore()))},
|
{"headBlockHash", fmt.Sprintf("%v", ReadHeadBlockHash(db.BlockStore()))},
|
||||||
{"headFastBlockHash", fmt.Sprintf("%v", ReadHeadFastBlockHash(db))},
|
{"headFastBlockHash", fmt.Sprintf("%v", ReadHeadFastBlockHash(db.BlockStore()))},
|
||||||
{"headHeaderHash", fmt.Sprintf("%v", ReadHeadHeaderHash(db.BlockStore()))},
|
{"headHeaderHash", fmt.Sprintf("%v", ReadHeadHeaderHash(db.BlockStore()))},
|
||||||
{"lastPivotNumber", pp(ReadLastPivotNumber(db))},
|
{"lastPivotNumber", pp(ReadLastPivotNumber(db))},
|
||||||
{"len(snapshotSyncStatus)", fmt.Sprintf("%d bytes", len(ReadSnapshotSyncStatus(db)))},
|
{"len(snapshotSyncStatus)", fmt.Sprintf("%d bytes", len(ReadSnapshotSyncStatus(db)))},
|
||||||
|
Loading…
Reference in New Issue
Block a user