feat: support rewind for versa db

fix: rewind ancient store
This commit is contained in:
joeycli 2024-08-27 15:51:30 +08:00
parent 4ed6b8e36f
commit a5e12fe178
2 changed files with 125 additions and 49 deletions

@ -428,42 +428,83 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Make sure the state associated with the block is available, or log out // Make sure the state associated with the block is available, or log out
// if there is no available state, waiting for state sync. // if there is no available state, waiting for state sync.
head := bc.CurrentBlock() head := bc.CurrentBlock()
if !bc.HasState(head.Root) { if bc.triedb.Scheme() != rawdb.VersionScheme {
if head.Number.Uint64() == 0 { if !bc.HasState(head.Root) {
// The genesis state is missing, which is only possible in the path-based if head.Number.Uint64() == 0 {
// scheme. This situation occurs when the initial state sync is not finished // The genesis state is missing, which is only possible in the path-based
// yet, or the chain head is rewound below the pivot point. In both scenarios, // scheme. This situation occurs when the initial state sync is not finished
// there is no possible recovery approach except for rerunning a snap sync. // yet, or the chain head is rewound below the pivot point. In both scenarios,
// Do nothing here until the state syncer picks it up. // there is no possible recovery approach except for rerunning a snap sync.
log.Info("Genesis state is missing, wait state sync") // Do nothing here until the state syncer picks it up.
} else { log.Info("Genesis state is missing, wait state sync")
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if bc.triedb.Scheme() == rawdb.PathScheme && !bc.NoTries() {
recoverable, _ := bc.triedb.Recoverable(diskRoot)
if !bc.HasState(diskRoot) && !recoverable {
diskRoot = bc.triedb.Head()
}
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "diskRoot", diskRoot)
snapDisk, err := bc.setHeadBeyondRoot(head.Number.Uint64(), 0, diskRoot, true)
if err != nil {
return nil, err
}
// Chain rewound, persist old snapshot number to indicate recovery procedure
if snapDisk != 0 {
rawdb.WriteSnapshotRecoveryNumber(bc.db, snapDisk)
}
} else { } else {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash()) // Head state is missing, before the state recovery, find out the
if _, err := bc.setHeadBeyondRoot(head.Number.Uint64(), 0, common.Hash{}, true); err != nil { // disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if bc.triedb.Scheme() == rawdb.PathScheme && !bc.NoTries() {
recoverable, _ := bc.triedb.Recoverable(diskRoot)
if !bc.HasState(diskRoot) && !recoverable {
diskRoot = bc.triedb.Head()
}
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "diskRoot", diskRoot)
snapDisk, err := bc.setHeadBeyondRoot(head.Number.Uint64(), 0, diskRoot, true)
if err != nil {
return nil, err
}
// Chain rewound, persist old snapshot number to indicate recovery procedure
if snapDisk != 0 {
rawdb.WriteSnapshotRecoveryNumber(bc.db, snapDisk)
}
} else {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash())
if _, err := bc.setHeadBeyondRoot(head.Number.Uint64(), 0, common.Hash{}, true); err != nil {
return nil, err
}
}
}
}
} else {
log.Warn("versa db no recovery, rewind in load state")
}
// Ensure that a previous crash in SetHead doesn't leave extra ancients
if bc.triedb.Scheme() != rawdb.VersionScheme {
if frozen, err := bc.db.BlockStore().ItemAmountInAncient(); err == nil && frozen > 0 {
frozen, err = bc.db.BlockStore().Ancients()
if err != nil {
return nil, err
}
var (
needRewind bool
low uint64
)
// The head full block may be rolled back to a very low height due to
// blockchain repair. If the head full block is even lower than the ancient
// chain, truncate the ancient store.
fullBlock := bc.CurrentBlock()
if fullBlock != nil && fullBlock.Hash() != bc.genesisBlock.Hash() && fullBlock.Number.Uint64() < frozen-1 {
needRewind = true
low = fullBlock.Number.Uint64()
}
// In snap sync, it may happen that ancient data has been written to the
// ancient store, but the LastFastBlock has not been updated, truncate the
// extra data here.
snapBlock := bc.CurrentSnapBlock()
if snapBlock != nil && snapBlock.Number.Uint64() < frozen-1 {
needRewind = true
if snapBlock.Number.Uint64() < low || low == 0 {
low = snapBlock.Number.Uint64()
}
}
if needRewind {
log.Error("Truncating ancient chain", "from", bc.CurrentHeader().Number.Uint64(), "to", low)
if err := bc.SetHead(low); err != nil {
return nil, err return nil, err
} }
} }
@ -725,20 +766,54 @@ func (bc *BlockChain) getFinalizedNumber(header *types.Header) uint64 {
// loadLastState loads the last known chain state from the database. This method // loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held. // assumes that the chain manager mutex is held.
func (bc *BlockChain) loadLastState() error { func (bc *BlockChain) loadLastState() error {
// Restore the last known head block // TODO:: before versa db support recovery, only rewind
head := rawdb.ReadHeadBlockHash(bc.db) var headBlock *types.Block
if head == (common.Hash{}) { if bc.triedb.Scheme() == rawdb.VersionScheme {
// Corrupt or empty database, init from scratch head := rawdb.ReadHeadBlockHash(bc.db)
log.Warn("Empty database, resetting chain") headBlock = bc.GetBlockByHash(head)
return bc.Reset()
} versa := bc.triedb.VersaDB()
// Make sure the entire head block is available archiveVersion, _ := versa.LatestStoreDiskVersionInfo()
headBlock := bc.GetBlockByHash(head) // empty chain
if headBlock == nil { if archiveVersion == -1 {
// Corrupt or empty database, init from scratch archiveVersion = 0
log.Warn("Head block missing, resetting chain", "hash", head) }
return bc.Reset()
if int64(headBlock.NumberU64()) < archiveVersion {
log.Crit("versa db disk version large than header block", "head number", headBlock.NumberU64(), "versa archive number", archiveVersion)
}
log.Info("begin rewind versa db head", "target", archiveVersion)
for {
if int64(headBlock.NumberU64()) == archiveVersion {
rawdb.WriteCanonicalHash(bc.db, headBlock.Hash(), headBlock.NumberU64())
rawdb.WriteHeadHeaderHash(bc.db, headBlock.Hash())
rawdb.WriteHeadBlockHash(bc.db, headBlock.Hash())
rawdb.WriteHeadFastBlockHash(bc.db, headBlock.Hash())
log.Info("reset versa db head block", "number", headBlock.NumberU64(), "hash", headBlock.Hash())
break
}
headBlock = rawdb.ReadBlock(bc.db, headBlock.ParentHash(), headBlock.NumberU64()-1)
if headBlock == nil {
panic("versa db rewind head is nil")
}
}
} else {
// Restore the last known head block
head := rawdb.ReadHeadBlockHash(bc.db)
if head == (common.Hash{}) {
// Corrupt or empty database, init from scratch
log.Warn("Empty database, resetting chain")
return bc.Reset()
}
// Make sure the entire head block is available
headBlock = bc.GetBlockByHash(head)
if headBlock == nil {
// Corrupt or empty database, init from scratch
log.Warn("Head block missing, resetting chain", "hash", head)
return bc.Reset()
}
} }
log.Info("load state head block", "number", headBlock.NumberU64())
// Everything seems to be fine, set as the head block // Everything seems to be fine, set as the head block
bc.currentBlock.Store(headBlock.Header()) bc.currentBlock.Store(headBlock.Header())

@ -49,7 +49,8 @@ func (v *VersionDB) Scheme() string {
} }
func (v *VersionDB) Initialized(genesisRoot common.Hash) bool { func (v *VersionDB) Initialized(genesisRoot common.Hash) bool {
return v.db.HasState(genesisRoot) version, _ := v.db.LatestStoreDiskVersionInfo()
return version >= 0
} }
func (v *VersionDB) Size() (common.StorageSize, common.StorageSize, common.StorageSize) { func (v *VersionDB) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {