bsc/triedb/pathdb/database.go
joeycli d20bbb4799 feat: integrate versa db
feat: add version commit

chore: forbid versiondb rewind

feat: add mode for new caching db

feat: add version scheme for startup

feat: init genesis for versa db

feat: add caching db and trie copy

feat: support HasState on versa db and must fullsync under versa db

fix: append open strorage trie error to statedb

fix: storage tree value encode

fix: add state object trie expire interface

fix: blockchain stateat use rw state

chore: forbid prefetcher

chore: delete storage pool

feat: hold version in state objet for repeat search account tree

fix: version mismatch that add contract balance without update storage tree

fix: 373559 blocks issue add breakpoint

feat: add version state debug system

feat: add hash state db debug system

feat: add version and hash state diff system

fix: the timing release debug state instance
2024-08-27 09:01:25 +08:00

576 lines
20 KiB
Go

// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pathdb
import (
"errors"
"fmt"
"io"
"os"
"sort"
"strconv"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/trie/triestate"
)
const (
// maxDiffLayers is the maximum diff layers allowed in the layer tree.
maxDiffLayers = 128
// defaultCleanSize is the default memory allowance of clean cache.
defaultCleanSize = 16 * 1024 * 1024
// MaxDirtyBufferSize is the maximum memory allowance of node buffer.
// Too large nodebuffer will cause the system to pause for a long
// time when write happens. Also, the largest batch that pebble can
// support is 4GB, node will panic if batch size exceeds this limit.
MaxDirtyBufferSize = 256 * 1024 * 1024
// DefaultDirtyBufferSize is the default memory allowance of node buffer
// that aggregates the writes from above until it's flushed into the
// disk. It's meant to be used once the initial sync is finished.
// Do not increase the buffer size arbitrarily, otherwise the system
// pause time will increase when the database writes happen.
DefaultDirtyBufferSize = 64 * 1024 * 1024
// DefaultBackgroundFlushInterval defines the default the wait interval
// that background node cache flush disk.
DefaultBackgroundFlushInterval = 3
// DefaultBatchRedundancyRate defines the batch size, compatible write
// size calculation is inaccurate
DefaultBatchRedundancyRate = 1.1
)
type JournalType int
const (
JournalKVType JournalType = iota
JournalFileType
)
// layer is the interface implemented by all state layers which includes some
// public methods and some additional methods for internal usage.
type layer interface {
// Node retrieves the trie node with the node info. An error will be returned
// if the read operation exits abnormally. For example, if the layer is already
// stale, or the associated state is regarded as corrupted. Notably, no error
// will be returned if the requested node is not found in database.
Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error)
// rootHash returns the root hash for which this layer was made.
rootHash() common.Hash
// stateID returns the associated state id of layer.
stateID() uint64
// parentLayer returns the subsequent layer of it, or nil if the disk was reached.
parentLayer() layer
// update creates a new layer on top of the existing layer diff tree with
// the provided dirty trie nodes along with the state change set.
//
// Note, the maps are retained by the method to avoid copying everything.
update(root common.Hash, id uint64, block uint64, nodes map[common.Hash]map[string]*trienode.Node, states *triestate.Set) *diffLayer
// journal commits an entire diff hierarchy to disk into a single journal entry.
// This is meant to be used during shutdown to persist the layer without
// flattening everything down (bad for reorgs).
journal(w io.Writer, journalType JournalType) error
}
// Config contains the settings for database.
type Config struct {
SyncFlush bool // Flag of trienodebuffer sync flush cache to disk
StateHistory uint64 // Number of recent blocks to maintain state history for
CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes
DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes
ReadOnly bool // Flag whether the database is opened in read only mode.
NoTries bool
JournalFilePath string
JournalFile bool
}
// sanitize checks the provided user configurations and changes anything that's
// unreasonable or unworkable.
func (c *Config) sanitize() *Config {
conf := *c
if conf.DirtyCacheSize > MaxDirtyBufferSize {
log.Warn("Sanitizing invalid node buffer size", "provided", common.StorageSize(conf.DirtyCacheSize), "updated", common.StorageSize(MaxDirtyBufferSize))
conf.DirtyCacheSize = MaxDirtyBufferSize
}
return &conf
}
// Defaults contains default settings for Ethereum mainnet.
var Defaults = &Config{
StateHistory: params.FullImmutabilityThreshold,
CleanCacheSize: defaultCleanSize,
DirtyCacheSize: DefaultDirtyBufferSize,
}
// ReadOnly is the config in order to open database in read only mode.
var ReadOnly = &Config{ReadOnly: true}
// Database is a multiple-layered structure for maintaining in-memory trie nodes.
// It consists of one persistent base layer backed by a key-value store, on top
// of which arbitrarily many in-memory diff layers are stacked. The memory diffs
// can form a tree with branching, but the disk layer is singleton and common to
// all. If a reorg goes deeper than the disk layer, a batch of reverse diffs can
// be applied to rollback. The deepest reorg that can be handled depends on the
// amount of state histories tracked in the disk.
//
// At most one readable and writable database can be opened at the same time in
// the whole system which ensures that only one database writer can operate disk
// state. Unexpected open operations can cause the system to panic.
type Database struct {
// readOnly is the flag whether the mutation is allowed to be applied.
// It will be set automatically when the database is journaled during
// the shutdown to reject all following unexpected mutations.
readOnly bool // Flag if database is opened in read only mode
waitSync bool // Flag if database is deactivated due to initial state sync
bufferSize int // Memory allowance (in bytes) for caching dirty nodes
config *Config // Configuration for database
diskdb ethdb.Database // Persistent storage for matured trie nodes
tree *layerTree // The group for all known layers
freezer *rawdb.ResettableFreezer // Freezer for storing trie histories, nil possible in tests
lock sync.RWMutex // Lock to prevent mutations from happening at the same time
}
// New attempts to load an already existing layer from a persistent key-value
// store (with a number of memory layers from a journal). If the journal is not
// matched with the base persistent layer, all the recorded diff layers are discarded.
func New(diskdb ethdb.Database, config *Config) *Database {
if config == nil {
config = Defaults
}
config = config.sanitize()
db := &Database{
readOnly: config.ReadOnly,
bufferSize: config.DirtyCacheSize,
config: config,
diskdb: diskdb,
}
// Construct the layer tree by resolving the in-disk singleton state
// and in-memory layer journal.
db.tree = newLayerTree(db.loadLayers())
// Open the freezer for state history if the passed database contains an
// ancient store. Otherwise, all the relevant functionalities are disabled.
//
// Because the freezer can only be opened once at the same time, this
// mechanism also ensures that at most one **non-readOnly** database
// is opened at the same time to prevent accidental mutation.
if ancient, err := diskdb.AncientDatadir(); err == nil && ancient != "" && !db.readOnly && !config.NoTries {
offset := uint64(0) // differ from in block data, only metadata is used in state data
freezer, err := rawdb.NewStateFreezer(ancient, false, offset)
if err != nil {
log.Crit("Failed to open state history freezer", "err", err)
}
db.freezer = freezer
diskLayerID := db.tree.bottom().stateID()
if diskLayerID == 0 {
// Reset the entire state histories in case the trie database is
// not initialized yet, as these state histories are not expected.
frozen, err := db.freezer.Ancients()
if err != nil {
log.Crit("Failed to retrieve head of state history", "err", err)
}
if frozen != 0 {
err := db.freezer.Reset()
if err != nil {
log.Crit("Failed to reset state histories", "err", err)
}
log.Info("Truncated extraneous state history")
}
} else {
// Truncate the extra state histories above in freezer in case
// it's not aligned with the disk layer.
pruned, err := truncateFromHead(db.diskdb, freezer, diskLayerID)
if err != nil {
log.Crit("Failed to truncate extra state histories", "err", err)
}
if pruned != 0 {
log.Warn("Truncated extra state histories", "number", pruned)
}
}
}
// Disable database in case node is still in the initial state sync stage.
if rawdb.ReadSnapSyncStatusFlag(diskdb) == rawdb.StateSyncRunning && !db.readOnly {
if err := db.Disable(); err != nil {
log.Crit("Failed to disable database", "err", err) // impossible to happen
}
}
log.Info("success to init path mode triedb")
return db
}
// Reader retrieves a layer belonging to the given state root.
func (db *Database) Reader(root common.Hash) (layer, error) {
l := db.tree.get(root)
if l == nil {
return nil, fmt.Errorf("state %#x is not available", root)
}
return l, nil
}
// Update adds a new layer into the tree, if that can be linked to an existing
// old parent. It is disallowed to insert a disk layer (the origin of all). Apart
// from that this function will flatten the extra diff layers at bottom into disk
// to only keep 128 diff layers in memory by default.
//
// The passed in maps(nodes, states) will be retained to avoid copying everything.
// Therefore, these maps must not be changed afterwards.
func (db *Database) Update(root common.Hash, parentRoot common.Hash, block uint64, nodes *trienode.MergedNodeSet, states *triestate.Set) error {
// Hold the lock to prevent concurrent mutations.
db.lock.Lock()
defer db.lock.Unlock()
// Short circuit if the mutation is not allowed.
if err := db.modifyAllowed(); err != nil {
return err
}
if err := db.tree.add(root, parentRoot, block, nodes, states); err != nil {
return err
}
// Keep 128 diff layers in the memory, persistent layer is 129th.
// - head layer is paired with HEAD state
// - head-1 layer is paired with HEAD-1 state
// - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state
// - head-128 layer(disk layer) is paired with HEAD-128 state
return db.tree.cap(root, maxDiffLayers)
}
// Commit traverses downwards the layer tree from a specified layer with the
// provided state root and all the layers below are flattened downwards. It
// can be used alone and mostly for test purposes.
func (db *Database) Commit(root common.Hash, report bool) error {
// Hold the lock to prevent concurrent mutations.
db.lock.Lock()
defer db.lock.Unlock()
// Short circuit if the mutation is not allowed.
if err := db.modifyAllowed(); err != nil {
return err
}
return db.tree.cap(root, 0)
}
// Disable deactivates the database and invalidates all available state layers
// as stale to prevent access to the persistent state, which is in the syncing
// stage.
func (db *Database) Disable() error {
db.lock.Lock()
defer db.lock.Unlock()
// Short circuit if the database is in read only mode.
if db.readOnly {
return errDatabaseReadOnly
}
// Prevent duplicated disable operation.
if db.waitSync {
log.Error("Reject duplicated disable operation")
return nil
}
db.waitSync = true
// Mark the disk layer as stale to prevent access to persistent state.
db.tree.bottom().markStale()
// Write the initial sync flag to persist it across restarts.
rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncRunning)
log.Info("Disabled trie database due to state sync")
return nil
}
// Enable activates database and resets the state tree with the provided persistent
// state root once the state sync is finished.
func (db *Database) Enable(root common.Hash) error {
db.lock.Lock()
defer db.lock.Unlock()
// Short circuit if the database is in read only mode.
if db.readOnly {
return errDatabaseReadOnly
}
// Ensure the provided state root matches the stored one.
root = types.TrieRootHash(root)
_, stored := rawdb.ReadAccountTrieNode(db.diskdb, nil)
if stored != root {
return fmt.Errorf("state root mismatch: stored %x, synced %x", stored, root)
}
// Drop the stale state journal in persistent database and
// reset the persistent state id back to zero.
batch := db.diskdb.NewBatch()
db.DeleteTrieJournal(batch)
rawdb.WritePersistentStateID(batch, 0)
if err := batch.Write(); err != nil {
return err
}
// Clean up all state histories in freezer. Theoretically
// all root->id mappings should be removed as well. Since
// mappings can be huge and might take a while to clear
// them, just leave them in disk and wait for overwriting.
if db.freezer != nil {
if err := db.freezer.Reset(); err != nil {
return err
}
}
// Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**.
dl := newDiskLayer(root, 0, db, nil, NewTrieNodeBuffer(db.config.SyncFlush, db.bufferSize, nil, 0))
db.tree.reset(dl)
// Re-enable the database as the final step.
db.waitSync = false
rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncFinished)
log.Info("Rebuilt trie database", "root", root)
return nil
}
// Recover rollbacks the database to a specified historical point.
// The state is supported as the rollback destination only if it's
// canonical state and the corresponding trie histories are existent.
func (db *Database) Recover(root common.Hash, loader triestate.TrieLoader) error {
db.lock.Lock()
defer db.lock.Unlock()
// Short circuit if rollback operation is not supported.
if err := db.modifyAllowed(); err != nil {
return err
}
if db.freezer == nil {
return errors.New("state rollback is non-supported")
}
// Short circuit if the target state is not recoverable.
root = types.TrieRootHash(root)
if !db.Recoverable(root) {
return errStateUnrecoverable
}
// Apply the state histories upon the disk layer in order.
var (
start = time.Now()
dl = db.tree.bottom()
)
for dl.rootHash() != root {
h, err := readHistory(db.freezer, dl.stateID())
if err != nil {
return err
}
dl, err = dl.revert(h, loader)
if err != nil {
return err
}
// reset layer with newly created disk layer. It must be
// done after each revert operation, otherwise the new
// disk layer won't be accessible from outside.
db.tree.reset(dl)
}
db.DeleteTrieJournal(db.diskdb)
_, err := truncateFromHead(db.diskdb, db.freezer, dl.stateID())
if err != nil {
return err
}
log.Debug("Recovered state", "root", root, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}
// Recoverable returns the indicator if the specified state is recoverable.
func (db *Database) Recoverable(root common.Hash) bool {
// Ensure the requested state is a known state.
root = types.TrieRootHash(root)
id := rawdb.ReadStateID(db.diskdb, root)
if id == nil {
return false
}
// Recoverable state must below the disk layer. The recoverable
// state only refers the state that is currently not available,
// but can be restored by applying state history.
dl := db.tree.bottom()
if *id >= dl.stateID() {
return false
}
// Ensure the requested state is a canonical state and all state
// histories in range [id+1, disklayer.ID] are present and complete.
parent := root
return checkHistories(db.freezer, *id+1, dl.stateID()-*id, func(m *meta) error {
if m.parent != parent {
return errors.New("unexpected state history")
}
if len(m.incomplete) > 0 {
return errors.New("incomplete state history")
}
parent = m.root
return nil
}) == nil
}
// Close closes the trie database and the held freezer.
func (db *Database) Close() error {
db.lock.Lock()
defer db.lock.Unlock()
// Set the database to read-only mode to prevent all
// following mutations.
db.readOnly = true
// Release the memory held by clean cache.
db.tree.bottom().resetCache()
// Close the attached state history freezer.
if db.freezer == nil {
return nil
}
return db.freezer.Close()
}
// Size returns the current storage size of the memory cache in front of the
// persistent database layer.
func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize, immutableNodes common.StorageSize) {
db.tree.forEach(func(layer layer) {
if diff, ok := layer.(*diffLayer); ok {
diffs += common.StorageSize(diff.memory)
}
if disk, ok := layer.(*diskLayer); ok {
nodes, immutableNodes = disk.size()
}
})
return diffs, nodes, immutableNodes
}
// Initialized returns an indicator if the state data is already
// initialized in path-based scheme.
func (db *Database) Initialized(genesisRoot common.Hash) bool {
var inited bool
db.tree.forEach(func(layer layer) {
if layer.rootHash() != types.EmptyRootHash {
inited = true
}
})
if !inited {
inited = rawdb.ReadSnapSyncStatusFlag(db.diskdb) != rawdb.StateSyncUnknown
}
return inited
}
// SetBufferSize sets the node buffer size to the provided value(in bytes).
func (db *Database) SetBufferSize(size int) error {
db.lock.Lock()
defer db.lock.Unlock()
if size > MaxDirtyBufferSize {
log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(MaxDirtyBufferSize))
size = MaxDirtyBufferSize
}
db.bufferSize = size
return db.tree.bottom().setBufferSize(db.bufferSize)
}
// Scheme returns the node scheme used in the database.
func (db *Database) Scheme() string {
return rawdb.PathScheme
}
// Head return the top non-fork difflayer/disklayer root hash for rewinding.
func (db *Database) Head() common.Hash {
db.lock.Lock()
defer db.lock.Unlock()
return db.tree.front()
}
// modifyAllowed returns the indicator if mutation is allowed. This function
// assumes the db.lock is already held.
func (db *Database) modifyAllowed() error {
if db.readOnly {
return errDatabaseReadOnly
}
if db.waitSync {
return errDatabaseWaitSync
}
return nil
}
// GetAllRooHash returns all diffLayer and diskLayer root hash
func (db *Database) GetAllRooHash() [][]string {
db.lock.Lock()
defer db.lock.Unlock()
data := make([][]string, 0, len(db.tree.layers))
for _, v := range db.tree.layers {
if dl, ok := v.(*diffLayer); ok {
data = append(data, []string{fmt.Sprintf("%d", dl.block), dl.rootHash().String()})
}
}
sort.Slice(data, func(i, j int) bool {
block1, _ := strconv.Atoi(data[i][0])
block2, _ := strconv.Atoi(data[j][0])
return block1 > block2
})
data = append(data, []string{"-1", db.tree.bottom().rootHash().String()})
return data
}
// DetermineJournalTypeForWriter is used when persisting the journal. It determines JournalType based on the config passed in by the Config.
func (db *Database) DetermineJournalTypeForWriter() JournalType {
if db.config.JournalFile {
return JournalFileType
} else {
return JournalKVType
}
}
// DetermineJournalTypeForReader is used when loading the journal. It loads based on whether JournalKV or JournalFile currently exists.
func (db *Database) DetermineJournalTypeForReader() JournalType {
if journal := rawdb.ReadTrieJournal(db.diskdb); len(journal) != 0 {
return JournalKVType
}
if fileInfo, stateErr := os.Stat(db.config.JournalFilePath); stateErr == nil && !fileInfo.IsDir() {
return JournalFileType
}
return JournalKVType
}
func (db *Database) DeleteTrieJournal(writer ethdb.KeyValueWriter) error {
// To prevent any remnants of old journals after converting from JournalKV to JournalFile or vice versa, all deletions must be completed.
rawdb.DeleteTrieJournal(writer)
// delete from journal file, may not exist
filePath := db.config.JournalFilePath
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return nil
}
errRemove := os.Remove(filePath)
if errRemove != nil {
log.Crit("Failed to remove tries journal", "journal path", filePath, "err", errRemove)
}
return nil
}