eth, trie/triedb/pathdb: pbss patches (#1955)
* fix: use the top root hash for rewinding under path schema * feat: add async flush nodebuffer in path schema * chore: add prun-block param suffix check * fix: code review comments
This commit is contained in:
parent
62862471f8
commit
53559fc4d7
@ -97,6 +97,7 @@ var (
|
||||
utils.TransactionHistoryFlag,
|
||||
utils.StateSchemeFlag,
|
||||
utils.StateHistoryFlag,
|
||||
utils.PathDBSyncFlag,
|
||||
utils.LightServeFlag,
|
||||
utils.LightIngressFlag,
|
||||
utils.LightEgressFlag,
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/tsdb/fileutil"
|
||||
@ -360,7 +361,21 @@ func pruneBlock(ctx *cli.Context) error {
|
||||
if path == "" {
|
||||
return errors.New("prune failed, did not specify the AncientPath")
|
||||
}
|
||||
newAncientPath = filepath.Join(path, "ancient_back")
|
||||
newVersionPath := false
|
||||
files, err := os.ReadDir(oldAncientPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, file := range files {
|
||||
if file.IsDir() && file.Name() == "chain" {
|
||||
newVersionPath = true
|
||||
}
|
||||
}
|
||||
if newVersionPath && !strings.HasSuffix(oldAncientPath, "geth/chaindata/ancient/chain") {
|
||||
log.Error("datadir.ancient subdirectory incorrect", "got path", oldAncientPath, "want subdirectory", "geth/chaindata/ancient/chain/")
|
||||
return errors.New("datadir.ancient subdirectory incorrect")
|
||||
}
|
||||
newAncientPath = filepath.Join(path, "chain_back")
|
||||
|
||||
blockpruner = pruner.NewBlockPruner(chaindb, stack, oldAncientPath, newAncientPath, blockAmountReserved)
|
||||
|
||||
|
@ -314,6 +314,12 @@ var (
|
||||
Value: rawdb.HashScheme,
|
||||
Category: flags.StateCategory,
|
||||
}
|
||||
PathDBSyncFlag = &cli.BoolFlag{
|
||||
Name: "pathdb.sync",
|
||||
Usage: "sync flush nodes cache to disk in path schema",
|
||||
Value: false,
|
||||
Category: flags.StateCategory,
|
||||
}
|
||||
StateHistoryFlag = &cli.Uint64Flag{
|
||||
Name: "history.state",
|
||||
Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)",
|
||||
@ -1951,6 +1957,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
|
||||
log.Warn("The flag --txlookuplimit is deprecated and will be removed, please use --history.transactions")
|
||||
cfg.TransactionHistory = ctx.Uint64(TransactionHistoryFlag.Name)
|
||||
}
|
||||
if ctx.IsSet(PathDBSyncFlag.Name) {
|
||||
cfg.PathSyncFlush = true
|
||||
}
|
||||
if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 {
|
||||
cfg.TransactionHistory = 0
|
||||
log.Warn("Disabled transaction unindexing for archive node")
|
||||
|
@ -155,6 +155,7 @@ type CacheConfig struct {
|
||||
NoTries bool // Insecure settings. Do not have any tries in databases if enabled.
|
||||
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
|
||||
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
|
||||
PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk.
|
||||
|
||||
SnapshotNoBuild bool // Whether the background generation is allowed
|
||||
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
|
||||
@ -174,6 +175,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config {
|
||||
}
|
||||
if c.StateScheme == rawdb.PathScheme {
|
||||
config.PathDB = &pathdb.Config{
|
||||
SyncFlush: c.PathSyncFlush,
|
||||
StateHistory: c.StateHistory,
|
||||
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
|
||||
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
|
||||
@ -396,11 +398,15 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
|
||||
var diskRoot common.Hash
|
||||
if bc.cacheConfig.SnapshotLimit > 0 {
|
||||
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
|
||||
} else if bc.triedb.Scheme() == rawdb.PathScheme {
|
||||
_, diskRoot = rawdb.ReadAccountTrieNode(bc.db, nil)
|
||||
}
|
||||
if bc.triedb.Scheme() == rawdb.PathScheme {
|
||||
recoverable, _ := bc.triedb.Recoverable(diskRoot)
|
||||
if !bc.HasState(diskRoot) && !recoverable {
|
||||
diskRoot = bc.triedb.Head()
|
||||
}
|
||||
}
|
||||
if diskRoot != (common.Hash{}) {
|
||||
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "snaproot", diskRoot)
|
||||
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "diskRoot", diskRoot)
|
||||
|
||||
snapDisk, err := bc.setHeadBeyondRoot(head.Number.Uint64(), 0, diskRoot, true)
|
||||
if err != nil {
|
||||
@ -689,18 +695,18 @@ func (bc *BlockChain) loadLastState() error {
|
||||
blockTd = bc.GetTd(headBlock.Hash(), headBlock.NumberU64())
|
||||
)
|
||||
if headHeader.Hash() != headBlock.Hash() {
|
||||
log.Info("Loaded most recent local header", "number", headHeader.Number, "hash", headHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0)))
|
||||
log.Info("Loaded most recent local header", "number", headHeader.Number, "hash", headHeader.Hash(), "hash", headHeader.Root, "td", headerTd, "age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0)))
|
||||
}
|
||||
log.Info("Loaded most recent local block", "number", headBlock.Number(), "hash", headBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0)))
|
||||
log.Info("Loaded most recent local block", "number", headBlock.Number(), "hash", headBlock.Hash(), "root", headBlock.Root(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0)))
|
||||
if headBlock.Hash() != currentSnapBlock.Hash() {
|
||||
snapTd := bc.GetTd(currentSnapBlock.Hash(), currentSnapBlock.Number.Uint64())
|
||||
log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0)))
|
||||
log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "root", currentSnapBlock.Root, "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0)))
|
||||
}
|
||||
if posa, ok := bc.engine.(consensus.PoSA); ok {
|
||||
if currentFinalizedHeader := posa.GetFinalizedHeader(bc, headHeader); currentFinalizedHeader != nil {
|
||||
if currentFinalizedBlock := bc.GetBlockByHash(currentFinalizedHeader.Hash()); currentFinalizedBlock != nil {
|
||||
finalTd := bc.GetTd(currentFinalizedBlock.Hash(), currentFinalizedBlock.NumberU64())
|
||||
log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0)))
|
||||
log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "root", currentFinalizedBlock.Root(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0)))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -802,6 +808,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
|
||||
|
||||
// resetState resets the persistent state to genesis if it's not available.
|
||||
resetState := func() {
|
||||
log.Info("Reset to block with genesis state", "number", bc.genesisBlock.NumberU64(), "hash", bc.genesisBlock.Hash())
|
||||
// Short circuit if the genesis state is already present.
|
||||
if bc.HasState(bc.genesisBlock.Root()) {
|
||||
return
|
||||
@ -825,7 +832,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
|
||||
// chain reparation mechanism without deleting any data!
|
||||
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.Number.Uint64() {
|
||||
newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
|
||||
lastBlockNum := header.Number.Uint64()
|
||||
if newHeadBlock == nil {
|
||||
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
|
||||
newHeadBlock = bc.genesisBlock
|
||||
@ -835,10 +841,8 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
|
||||
// keeping rewinding until we exceed the optional threshold
|
||||
// root hash
|
||||
beyondRoot := (root == common.Hash{}) // Flag whether we're beyond the requested root (no root, always true)
|
||||
enoughBeyondCount := false
|
||||
beyondCount := 0
|
||||
|
||||
for {
|
||||
beyondCount++
|
||||
// If a root threshold was requested but not yet crossed, check
|
||||
if root != (common.Hash{}) && !beyondRoot && newHeadBlock.Root() == root {
|
||||
beyondRoot, rootNumber = true, newHeadBlock.NumberU64()
|
||||
@ -854,24 +858,11 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
|
||||
log.Error("Missing block in the middle, aiming genesis", "number", newHeadBlock.NumberU64()-1, "hash", newHeadBlock.ParentHash())
|
||||
newHeadBlock = bc.genesisBlock
|
||||
} else {
|
||||
log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot)
|
||||
log.Info("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot)
|
||||
newHeadBlock = bc.genesisBlock
|
||||
}
|
||||
}
|
||||
if beyondRoot || (enoughBeyondCount && root != common.Hash{}) || newHeadBlock.NumberU64() == 0 {
|
||||
if enoughBeyondCount && (root != common.Hash{}) && rootNumber == 0 {
|
||||
for {
|
||||
lastBlockNum++
|
||||
block := bc.GetBlockByNumber(lastBlockNum)
|
||||
if block == nil {
|
||||
break
|
||||
}
|
||||
if block.Root() == root {
|
||||
rootNumber = block.NumberU64()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if beyondRoot || newHeadBlock.NumberU64() == 0 {
|
||||
if newHeadBlock.NumberU64() == 0 {
|
||||
resetState()
|
||||
} else if !bc.HasState(newHeadBlock.Root()) {
|
||||
@ -1215,7 +1206,7 @@ func (bc *BlockChain) Stop() {
|
||||
for !bc.triegc.Empty() {
|
||||
triedb.Dereference(bc.triegc.PopItem())
|
||||
}
|
||||
if size, _ := triedb.Size(); size != 0 {
|
||||
if _, size, _, _ := triedb.Size(); size != 0 {
|
||||
log.Error("Dangling trie nodes after full cleanup")
|
||||
}
|
||||
}
|
||||
@ -1619,7 +1610,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
||||
if current := block.NumberU64(); current > bc.triesInMemory {
|
||||
// If we exceeded our memory allowance, flush matured singleton nodes to disk
|
||||
var (
|
||||
nodes, imgs = triedb.Size()
|
||||
_, nodes, _, imgs = triedb.Size()
|
||||
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
|
||||
)
|
||||
if nodes > limit || imgs > 4*1024*1024 {
|
||||
@ -2104,8 +2095,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
|
||||
stats.processed++
|
||||
stats.usedGas += usedGas
|
||||
|
||||
dirty, _ := bc.triedb.Size()
|
||||
stats.report(chain, it.index, dirty, setHead)
|
||||
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()
|
||||
stats.report(chain, it.index, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)
|
||||
|
||||
if !setHead {
|
||||
// After merge we expect few side chains. Simply count
|
||||
|
@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second
|
||||
|
||||
// report prints statistics if some number of blocks have been processed
|
||||
// or more than a few seconds have passed since the last message.
|
||||
func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool) {
|
||||
func (st *insertStats) report(chain []*types.Block, index int, trieDiffNodes, trieBufNodes, trieImmutableBufNodes common.StorageSize, setHead bool) {
|
||||
// Fetch the timings for the batch
|
||||
var (
|
||||
now = mclock.Now()
|
||||
@ -63,7 +63,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor
|
||||
if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
|
||||
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
|
||||
}
|
||||
context = append(context, []interface{}{"dirty", dirty}...)
|
||||
if trieDiffNodes != 0 { // pathdb
|
||||
context = append(context, []interface{}{"triediffs", trieDiffNodes}...)
|
||||
context = append(context, []interface{}{"triedirty", trieBufNodes}...)
|
||||
context = append(context, []interface{}{"trieimutabledirty", trieImmutableBufNodes}...)
|
||||
} else {
|
||||
context = append(context, []interface{}{"triedirty", trieBufNodes}...)
|
||||
}
|
||||
|
||||
if st.queued > 0 {
|
||||
context = append(context, []interface{}{"queued", st.queued}...)
|
||||
|
@ -1807,7 +1807,7 @@ func TestTrieForkGC(t *testing.T) {
|
||||
chain.stateCache.TrieDB().Dereference(blocks[len(blocks)-1-i].Root())
|
||||
chain.stateCache.TrieDB().Dereference(forks[len(blocks)-1-i].Root())
|
||||
}
|
||||
if nodes, _ := chain.TrieDB().Size(); nodes > 0 {
|
||||
if _, nodes, _, _ := chain.TrieDB().Size(); nodes > 0 {
|
||||
t.Fatalf("stale tries still alive after garbase collection")
|
||||
}
|
||||
}
|
||||
|
@ -145,7 +145,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
||||
// dirty cache to the clean cache.
|
||||
if config.StateScheme == rawdb.PathScheme && config.TrieDirtyCache > pathdb.MaxDirtyBufferSize/1024/1024 {
|
||||
log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024, "adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize))
|
||||
config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxDirtyBufferSize/1024/1024
|
||||
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024)
|
||||
config.TrieDirtyCache = pathdb.MaxDirtyBufferSize / 1024 / 1024
|
||||
}
|
||||
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)
|
||||
@ -225,6 +225,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
||||
Preimages: config.Preimages,
|
||||
StateHistory: config.StateHistory,
|
||||
StateScheme: config.StateScheme,
|
||||
PathSyncFlush: config.PathSyncFlush,
|
||||
}
|
||||
)
|
||||
bcOps := make([]core.BlockChainOption, 0)
|
||||
|
@ -123,6 +123,7 @@ type Config struct {
|
||||
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
|
||||
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
|
||||
StateScheme string `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top
|
||||
PathSyncFlush bool `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top
|
||||
|
||||
// RequiredBlocks is a set of block number -> hash mappings which must be in the
|
||||
// canonical chain of all remote peers. Setting the option makes geth verify the
|
||||
|
@ -45,7 +45,6 @@ import (
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -979,7 +978,9 @@ func (h *handler) voteBroadcastLoop() {
|
||||
// sync is finished.
|
||||
func (h *handler) enableSyncedFeatures() {
|
||||
h.acceptTxs.Store(true)
|
||||
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
|
||||
h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize)
|
||||
}
|
||||
// In the bsc scenario, pathdb.MaxDirtyBufferSize (256MB) will be used.
|
||||
// The performance is better than DefaultDirtyBufferSize (64MB).
|
||||
//if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
|
||||
// h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize)
|
||||
//}
|
||||
}
|
||||
|
@ -175,8 +175,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u
|
||||
parent = root
|
||||
}
|
||||
if report {
|
||||
nodes, imgs := triedb.Size()
|
||||
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs)
|
||||
diff, nodes, immutablenodes, imgs := triedb.Size()
|
||||
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "layer", diff, "nodes", nodes, "immutablenodes", immutablenodes, "preimages", imgs)
|
||||
}
|
||||
return statedb, func() { triedb.Dereference(block.Root()) }, nil
|
||||
}
|
||||
|
@ -372,8 +372,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
|
||||
// if the relevant state is available in disk.
|
||||
var preferDisk bool
|
||||
if statedb != nil {
|
||||
s1, s2 := statedb.Database().TrieDB().Size()
|
||||
preferDisk = s1+s2 > defaultTracechainMemLimit
|
||||
s1, s2, s3, s4 := statedb.Database().TrieDB().Size()
|
||||
preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit
|
||||
}
|
||||
statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk)
|
||||
if err != nil {
|
||||
|
@ -61,7 +61,7 @@ type backend interface {
|
||||
|
||||
// Size returns the current storage size of the memory cache in front of the
|
||||
// persistent database layer.
|
||||
Size() common.StorageSize
|
||||
Size() (common.StorageSize, common.StorageSize, common.StorageSize)
|
||||
|
||||
// Update performs a state transition by committing dirty nodes contained
|
||||
// in the given set in order to update state from the specified parent to
|
||||
@ -207,16 +207,16 @@ func (db *Database) Commit(root common.Hash, report bool) error {
|
||||
|
||||
// Size returns the storage size of dirty trie nodes in front of the persistent
|
||||
// database and the size of cached preimages.
|
||||
func (db *Database) Size() (common.StorageSize, common.StorageSize) {
|
||||
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) {
|
||||
var (
|
||||
storages common.StorageSize
|
||||
diffs, nodes, immutablenodes common.StorageSize
|
||||
preimages common.StorageSize
|
||||
)
|
||||
storages = db.backend.Size()
|
||||
diffs, nodes, immutablenodes = db.backend.Size()
|
||||
if db.preimages != nil {
|
||||
preimages = db.preimages.size()
|
||||
}
|
||||
return storages, preimages
|
||||
return diffs, nodes, immutablenodes, preimages
|
||||
}
|
||||
|
||||
// Initialized returns an indicator if the state data is already initialized
|
||||
@ -353,3 +353,14 @@ func (db *Database) SetBufferSize(size int) error {
|
||||
}
|
||||
return pdb.SetBufferSize(size)
|
||||
}
|
||||
|
||||
// Head return the top non-fork difflayer/disklayer root hash for rewinding.
|
||||
// It's only supported by path-based database and will return an error for
|
||||
// others.
|
||||
func (db *Database) Head() common.Hash {
|
||||
pdb, ok := db.backend.(*pathdb.Database)
|
||||
if !ok {
|
||||
return common.Hash{}
|
||||
}
|
||||
return pdb.Head()
|
||||
}
|
||||
|
@ -641,7 +641,7 @@ func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, n
|
||||
|
||||
// Size returns the current storage size of the memory cache in front of the
|
||||
// persistent database layer.
|
||||
func (db *Database) Size() common.StorageSize {
|
||||
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
|
||||
db.lock.RLock()
|
||||
defer db.lock.RUnlock()
|
||||
|
||||
@ -649,7 +649,7 @@ func (db *Database) Size() common.StorageSize {
|
||||
// the total memory consumption, the maintenance metadata is also needed to be
|
||||
// counted.
|
||||
var metadataSize = common.StorageSize(len(db.dirties) * cachedNodeSize)
|
||||
return db.dirtiesSize + db.childrenSize + metadataSize
|
||||
return 0, db.dirtiesSize + db.childrenSize + metadataSize, 0
|
||||
}
|
||||
|
||||
// Close closes the trie database and releases all held resources.
|
||||
|
448
trie/triedb/pathdb/asyncnodebuffer.go
Normal file
448
trie/triedb/pathdb/asyncnodebuffer.go
Normal file
@ -0,0 +1,448 @@
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/fastcache"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/trie/trienode"
|
||||
)
|
||||
|
||||
var _ trienodebuffer = &asyncnodebuffer{}
|
||||
|
||||
// asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache
|
||||
// to disk.
|
||||
type asyncnodebuffer struct {
|
||||
mux sync.RWMutex
|
||||
current *nodecache
|
||||
background *nodecache
|
||||
}
|
||||
|
||||
// newAsyncNodeBuffer initializes the async node buffer with the provided nodes.
|
||||
func newAsyncNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *asyncnodebuffer {
|
||||
if nodes == nil {
|
||||
nodes = make(map[common.Hash]map[string]*trienode.Node)
|
||||
}
|
||||
var size uint64
|
||||
for _, subset := range nodes {
|
||||
for path, n := range subset {
|
||||
size += uint64(len(n.Blob) + len(path))
|
||||
}
|
||||
}
|
||||
|
||||
return &asyncnodebuffer{
|
||||
current: newNodeCache(uint64(limit), size, nodes, layers),
|
||||
background: newNodeCache(uint64(limit), 0, make(map[common.Hash]map[string]*trienode.Node), 0),
|
||||
}
|
||||
}
|
||||
|
||||
// node retrieves the trie node with given node info.
|
||||
func (a *asyncnodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) {
|
||||
a.mux.RLock()
|
||||
defer a.mux.RUnlock()
|
||||
|
||||
node, err := a.current.node(owner, path, hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if node == nil {
|
||||
return a.background.node(owner, path, hash)
|
||||
}
|
||||
return node, nil
|
||||
}
|
||||
|
||||
// commit merges the dirty nodes into the nodebuffer. This operation won't take
|
||||
// the ownership of the nodes map which belongs to the bottom-most diff layer.
|
||||
// It will just hold the node references from the given map which are safe to
|
||||
// copy.
|
||||
func (a *asyncnodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer {
|
||||
a.mux.Lock()
|
||||
defer a.mux.Unlock()
|
||||
|
||||
err := a.current.commit(nodes)
|
||||
if err != nil {
|
||||
log.Crit("[BUG] failed to commit nodes to asyncnodebuffer", "error", err)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// revert is the reverse operation of commit. It also merges the provided nodes
|
||||
// into the nodebuffer, the difference is that the provided node set should
|
||||
// revert the changes made by the last state transition.
|
||||
func (a *asyncnodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error {
|
||||
a.mux.Lock()
|
||||
defer a.mux.Unlock()
|
||||
|
||||
var err error
|
||||
a.current, err = a.current.merge(a.background)
|
||||
if err != nil {
|
||||
log.Crit("[BUG] failed to merge node cache under revert async node buffer", "error", err)
|
||||
}
|
||||
a.background.reset()
|
||||
return a.current.revert(db, nodes)
|
||||
}
|
||||
|
||||
// setSize is unsupported in asyncnodebuffer, due to the double buffer, blocking will occur.
|
||||
func (a *asyncnodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
|
||||
return errors.New("not supported")
|
||||
}
|
||||
|
||||
// reset cleans up the disk cache.
|
||||
func (a *asyncnodebuffer) reset() {
|
||||
a.mux.Lock()
|
||||
defer a.mux.Unlock()
|
||||
|
||||
a.current.reset()
|
||||
a.background.reset()
|
||||
}
|
||||
|
||||
// empty returns an indicator if nodebuffer contains any state transition inside.
|
||||
func (a *asyncnodebuffer) empty() bool {
|
||||
a.mux.RLock()
|
||||
defer a.mux.RUnlock()
|
||||
|
||||
return a.current.empty() && a.background.empty()
|
||||
}
|
||||
|
||||
// setSize sets the buffer size to the provided number, and invokes a flush
|
||||
// operation if the current memory usage exceeds the new limit.
|
||||
//func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
|
||||
// b.limit = uint64(size)
|
||||
// return b.flush(db, clean, id, false)
|
||||
//}
|
||||
|
||||
// flush persists the in-memory dirty trie node into the disk if the configured
|
||||
// memory threshold is reached. Note, all data must be written atomically.
|
||||
func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
|
||||
a.mux.Lock()
|
||||
defer a.mux.Unlock()
|
||||
|
||||
if force {
|
||||
for {
|
||||
if atomic.LoadUint64(&a.background.immutable) == 1 {
|
||||
time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second)
|
||||
log.Info("waiting background memory table flush to disk for force flush node buffer")
|
||||
continue
|
||||
}
|
||||
atomic.StoreUint64(&a.current.immutable, 1)
|
||||
return a.current.flush(db, clean, id)
|
||||
}
|
||||
}
|
||||
|
||||
if a.current.size < a.current.limit {
|
||||
return nil
|
||||
}
|
||||
|
||||
// background flush doing
|
||||
if atomic.LoadUint64(&a.background.immutable) == 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
atomic.StoreUint64(&a.current.immutable, 1)
|
||||
a.current, a.background = a.background, a.current
|
||||
|
||||
go func(persistId uint64) {
|
||||
for {
|
||||
err := a.background.flush(db, clean, persistId)
|
||||
if err == nil {
|
||||
log.Debug("succeed to flush background nodecahce to disk", "state_id", persistId)
|
||||
return
|
||||
}
|
||||
log.Error("failed to flush background nodecahce to disk", "state_id", persistId, "error", err)
|
||||
}
|
||||
}(id)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node {
|
||||
a.mux.Lock()
|
||||
defer a.mux.Unlock()
|
||||
|
||||
cached, err := a.current.merge(a.background)
|
||||
if err != nil {
|
||||
log.Crit("[BUG] failed to merge nodecache under revert asyncnodebuffer", "error", err)
|
||||
}
|
||||
return cached.nodes
|
||||
}
|
||||
|
||||
func (a *asyncnodebuffer) getLayers() uint64 {
|
||||
a.mux.RLock()
|
||||
defer a.mux.RUnlock()
|
||||
|
||||
return a.current.layers + a.background.layers
|
||||
}
|
||||
|
||||
func (a *asyncnodebuffer) getSize() (uint64, uint64) {
|
||||
a.mux.RLock()
|
||||
defer a.mux.RUnlock()
|
||||
|
||||
return a.current.size, a.background.size
|
||||
}
|
||||
|
||||
type nodecache struct {
|
||||
layers uint64 // The number of diff layers aggregated inside
|
||||
size uint64 // The size of aggregated writes
|
||||
limit uint64 // The maximum memory allowance in bytes
|
||||
nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path
|
||||
immutable uint64 // The flag equal 1, flush nodes to disk background
|
||||
}
|
||||
|
||||
func newNodeCache(limit, size uint64, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *nodecache {
|
||||
return &nodecache{
|
||||
layers: layers,
|
||||
size: size,
|
||||
limit: limit,
|
||||
nodes: nodes,
|
||||
immutable: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (nc *nodecache) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) {
|
||||
subset, ok := nc.nodes[owner]
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
n, ok := subset[string(path)]
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
if n.Hash != hash {
|
||||
dirtyFalseMeter.Mark(1)
|
||||
log.Error("Unexpected trie node in node buffer", "owner", owner, "path", path, "expect", hash, "got", n.Hash)
|
||||
return nil, newUnexpectedNodeError("dirty", hash, n.Hash, owner, path, n.Blob)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (nc *nodecache) commit(nodes map[common.Hash]map[string]*trienode.Node) error {
|
||||
if atomic.LoadUint64(&nc.immutable) == 1 {
|
||||
return errWriteImmutable
|
||||
}
|
||||
var (
|
||||
delta int64
|
||||
overwrite int64
|
||||
overwriteSize int64
|
||||
)
|
||||
for owner, subset := range nodes {
|
||||
current, exist := nc.nodes[owner]
|
||||
if !exist {
|
||||
// Allocate a new map for the subset instead of claiming it directly
|
||||
// from the passed map to avoid potential concurrent map read/write.
|
||||
// The nodes belong to original diff layer are still accessible even
|
||||
// after merging, thus the ownership of nodes map should still belong
|
||||
// to original layer and any mutation on it should be prevented.
|
||||
current = make(map[string]*trienode.Node)
|
||||
for path, n := range subset {
|
||||
current[path] = n
|
||||
delta += int64(len(n.Blob) + len(path))
|
||||
}
|
||||
nc.nodes[owner] = current
|
||||
continue
|
||||
}
|
||||
for path, n := range subset {
|
||||
if orig, exist := current[path]; !exist {
|
||||
delta += int64(len(n.Blob) + len(path))
|
||||
} else {
|
||||
delta += int64(len(n.Blob) - len(orig.Blob))
|
||||
overwrite++
|
||||
overwriteSize += int64(len(orig.Blob) + len(path))
|
||||
}
|
||||
current[path] = n
|
||||
}
|
||||
nc.nodes[owner] = current
|
||||
}
|
||||
nc.updateSize(delta)
|
||||
nc.layers++
|
||||
gcNodesMeter.Mark(overwrite)
|
||||
gcBytesMeter.Mark(overwriteSize)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nc *nodecache) updateSize(delta int64) {
|
||||
size := int64(nc.size) + delta
|
||||
if size >= 0 {
|
||||
nc.size = uint64(size)
|
||||
return
|
||||
}
|
||||
s := nc.size
|
||||
nc.size = 0
|
||||
log.Error("Invalid pathdb buffer size", "prev", common.StorageSize(s), "delta", common.StorageSize(delta))
|
||||
}
|
||||
|
||||
func (nc *nodecache) reset() {
|
||||
atomic.StoreUint64(&nc.immutable, 0)
|
||||
nc.layers = 0
|
||||
nc.size = 0
|
||||
nc.nodes = make(map[common.Hash]map[string]*trienode.Node)
|
||||
}
|
||||
|
||||
func (nc *nodecache) empty() bool {
|
||||
return nc.layers == 0
|
||||
}
|
||||
|
||||
func (nc *nodecache) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
|
||||
if atomic.LoadUint64(&nc.immutable) != 1 {
|
||||
return errFlushMutable
|
||||
}
|
||||
|
||||
// Ensure the target state id is aligned with the internal counter.
|
||||
head := rawdb.ReadPersistentStateID(db)
|
||||
if head+nc.layers != id {
|
||||
return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", nc.layers, head, id)
|
||||
}
|
||||
var (
|
||||
start = time.Now()
|
||||
batch = db.NewBatchWithSize(int(float64(nc.size) * DefaultBatchRedundancyRate))
|
||||
)
|
||||
nodes := writeNodes(batch, nc.nodes, clean)
|
||||
rawdb.WritePersistentStateID(batch, id)
|
||||
|
||||
// Flush all mutations in a single batch
|
||||
size := batch.ValueSize()
|
||||
if err := batch.Write(); err != nil {
|
||||
return err
|
||||
}
|
||||
commitBytesMeter.Mark(int64(size))
|
||||
commitNodesMeter.Mark(int64(nodes))
|
||||
commitTimeTimer.UpdateSince(start)
|
||||
log.Debug("Persisted pathdb nodes", "nodes", len(nc.nodes), "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
nc.reset()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
|
||||
if nc == nil && nc1 == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if nc == nil || nc.empty() {
|
||||
res := copyNodeCache(nc1)
|
||||
atomic.StoreUint64(&res.immutable, 0)
|
||||
return nc1, nil
|
||||
}
|
||||
if nc1 == nil || nc1.empty() {
|
||||
res := copyNodeCache(nc)
|
||||
atomic.StoreUint64(&res.immutable, 0)
|
||||
return nc, nil
|
||||
}
|
||||
if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) {
|
||||
return nil, errIncompatibleMerge
|
||||
}
|
||||
|
||||
var (
|
||||
immutable *nodecache
|
||||
mutable *nodecache
|
||||
res = &nodecache{}
|
||||
)
|
||||
if atomic.LoadUint64(&nc.immutable) == 1 {
|
||||
immutable = nc
|
||||
mutable = nc1
|
||||
} else {
|
||||
immutable = nc1
|
||||
mutable = nc
|
||||
}
|
||||
res.size = immutable.size + mutable.size
|
||||
res.layers = immutable.layers + mutable.layers
|
||||
res.limit = immutable.size
|
||||
res.nodes = make(map[common.Hash]map[string]*trienode.Node)
|
||||
for acc, subTree := range immutable.nodes {
|
||||
if _, ok := res.nodes[acc]; !ok {
|
||||
res.nodes[acc] = make(map[string]*trienode.Node)
|
||||
}
|
||||
for path, node := range subTree {
|
||||
res.nodes[acc][path] = node
|
||||
}
|
||||
}
|
||||
|
||||
for acc, subTree := range mutable.nodes {
|
||||
if _, ok := res.nodes[acc]; !ok {
|
||||
res.nodes[acc] = make(map[string]*trienode.Node)
|
||||
}
|
||||
for path, node := range subTree {
|
||||
res.nodes[acc][path] = node
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (nc *nodecache) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error {
|
||||
if atomic.LoadUint64(&nc.immutable) == 1 {
|
||||
return errRevertImmutable
|
||||
}
|
||||
|
||||
// Short circuit if no embedded state transition to revert.
|
||||
if nc.layers == 0 {
|
||||
return errStateUnrecoverable
|
||||
}
|
||||
nc.layers--
|
||||
|
||||
// Reset the entire buffer if only a single transition left.
|
||||
if nc.layers == 0 {
|
||||
nc.reset()
|
||||
return nil
|
||||
}
|
||||
var delta int64
|
||||
for owner, subset := range nodes {
|
||||
current, ok := nc.nodes[owner]
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("non-existent subset (%x)", owner))
|
||||
}
|
||||
for path, n := range subset {
|
||||
orig, ok := current[path]
|
||||
if !ok {
|
||||
// There is a special case in MPT that one child is removed from
|
||||
// a fullNode which only has two children, and then a new child
|
||||
// with different position is immediately inserted into the fullNode.
|
||||
// In this case, the clean child of the fullNode will also be
|
||||
// marked as dirty because of node collapse and expansion.
|
||||
//
|
||||
// In case of database rollback, don't panic if this "clean"
|
||||
// node occurs which is not present in buffer.
|
||||
var nhash common.Hash
|
||||
if owner == (common.Hash{}) {
|
||||
_, nhash = rawdb.ReadAccountTrieNode(db, []byte(path))
|
||||
} else {
|
||||
_, nhash = rawdb.ReadStorageTrieNode(db, owner, []byte(path))
|
||||
}
|
||||
// Ignore the clean node in the case described above.
|
||||
if nhash == n.Hash {
|
||||
continue
|
||||
}
|
||||
panic(fmt.Sprintf("non-existent node (%x %v) blob: %v", owner, path, crypto.Keccak256Hash(n.Blob).Hex()))
|
||||
}
|
||||
current[path] = n
|
||||
delta += int64(len(n.Blob)) - int64(len(orig.Blob))
|
||||
}
|
||||
}
|
||||
nc.updateSize(delta)
|
||||
return nil
|
||||
}
|
||||
|
||||
func copyNodeCache(n *nodecache) *nodecache {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
nc := &nodecache{
|
||||
layers: n.layers,
|
||||
size: n.size,
|
||||
limit: n.limit,
|
||||
immutable: atomic.LoadUint64(&n.immutable),
|
||||
nodes: make(map[common.Hash]map[string]*trienode.Node),
|
||||
}
|
||||
for acc, subTree := range n.nodes {
|
||||
if _, ok := nc.nodes[acc]; !ok {
|
||||
nc.nodes[acc] = make(map[string]*trienode.Node)
|
||||
}
|
||||
for path, node := range subTree {
|
||||
nc.nodes[acc][path] = node
|
||||
}
|
||||
}
|
||||
return nc
|
||||
}
|
@ -52,6 +52,14 @@ const (
|
||||
// Do not increase the buffer size arbitrarily, otherwise the system
|
||||
// pause time will increase when the database writes happen.
|
||||
DefaultDirtyBufferSize = 64 * 1024 * 1024
|
||||
|
||||
// DefaultBackgroundFlushInterval defines the default the wait interval
|
||||
// that background node cache flush disk.
|
||||
DefaultBackgroundFlushInterval = 3
|
||||
|
||||
// DefaultBatchRedundancyRate defines the batch size, compatible write
|
||||
// size calculation is inaccurate
|
||||
DefaultBatchRedundancyRate = 1.1
|
||||
)
|
||||
|
||||
// layer is the interface implemented by all state layers which includes some
|
||||
@ -86,6 +94,7 @@ type layer interface {
|
||||
|
||||
// Config contains the settings for database.
|
||||
type Config struct {
|
||||
SyncFlush bool // Flag of trienodebuffer sync flush cache to disk
|
||||
StateHistory uint64 // Number of recent blocks to maintain state history for
|
||||
CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes
|
||||
DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes
|
||||
@ -180,7 +189,7 @@ func New(diskdb ethdb.Database, config *Config) *Database {
|
||||
log.Warn("Truncated extra state histories", "number", pruned)
|
||||
}
|
||||
}
|
||||
log.Warn("Path-based state scheme is an experimental feature")
|
||||
log.Warn("Path-based state scheme is an experimental feature", "sync", db.config.SyncFlush)
|
||||
return db
|
||||
}
|
||||
|
||||
@ -283,7 +292,7 @@ func (db *Database) Reset(root common.Hash) error {
|
||||
}
|
||||
// Re-construct a new disk layer backed by persistent state
|
||||
// with **empty clean cache and node buffer**.
|
||||
dl := newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0))
|
||||
dl := newDiskLayer(root, 0, db, nil, NewTrieNodeBuffer(db.config.SyncFlush, db.bufferSize, nil, 0))
|
||||
db.tree.reset(dl)
|
||||
log.Info("Rebuilt trie database", "root", root)
|
||||
return nil
|
||||
@ -384,16 +393,16 @@ func (db *Database) Close() error {
|
||||
|
||||
// Size returns the current storage size of the memory cache in front of the
|
||||
// persistent database layer.
|
||||
func (db *Database) Size() (size common.StorageSize) {
|
||||
func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize, immutableNodes common.StorageSize) {
|
||||
db.tree.forEach(func(layer layer) {
|
||||
if diff, ok := layer.(*diffLayer); ok {
|
||||
size += common.StorageSize(diff.memory)
|
||||
diffs += common.StorageSize(diff.memory)
|
||||
}
|
||||
if disk, ok := layer.(*diskLayer); ok {
|
||||
size += disk.size()
|
||||
nodes, immutableNodes = disk.size()
|
||||
}
|
||||
})
|
||||
return size
|
||||
return diffs, nodes, immutableNodes
|
||||
}
|
||||
|
||||
// Initialized returns an indicator if the state data is already
|
||||
@ -425,3 +434,10 @@ func (db *Database) SetBufferSize(size int) error {
|
||||
func (db *Database) Scheme() string {
|
||||
return rawdb.PathScheme
|
||||
}
|
||||
|
||||
// Head return the top non-fork difflayer/disklayer root hash for rewinding.
|
||||
func (db *Database) Head() common.Hash {
|
||||
db.lock.Lock()
|
||||
defer db.lock.Unlock()
|
||||
return db.tree.front()
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ func newDiffLayer(parent layer, root common.Hash, id uint64, block uint64, nodes
|
||||
dirtyWriteMeter.Mark(size)
|
||||
diffLayerNodesMeter.Mark(int64(count))
|
||||
diffLayerBytesMeter.Mark(int64(dl.memory))
|
||||
log.Debug("Created new diff layer", "id", id, "block", block, "nodes", count, "size", common.StorageSize(dl.memory))
|
||||
log.Debug("Created new diff layer", "id", id, "block", block, "nodes", count, "size", common.StorageSize(dl.memory), "root", dl.root)
|
||||
return dl
|
||||
}
|
||||
|
||||
|
@ -25,25 +25,77 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/trie/trienode"
|
||||
"github.com/ethereum/go-ethereum/trie/triestate"
|
||||
"golang.org/x/crypto/sha3"
|
||||
)
|
||||
|
||||
// trienodebuffer is a collection of modified trie nodes to aggregate the disk
|
||||
// write. The content of the trienodebuffer must be checked before diving into
|
||||
// disk (since it basically is not-yet-written data).
|
||||
type trienodebuffer interface {
|
||||
// node retrieves the trie node with given node info.
|
||||
node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error)
|
||||
|
||||
// commit merges the dirty nodes into the trienodebuffer. This operation won't take
|
||||
// the ownership of the nodes map which belongs to the bottom-most diff layer.
|
||||
// It will just hold the node references from the given map which are safe to
|
||||
// copy.
|
||||
commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer
|
||||
|
||||
// revert is the reverse operation of commit. It also merges the provided nodes
|
||||
// into the trienodebuffer, the difference is that the provided node set should
|
||||
// revert the changes made by the last state transition.
|
||||
revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error
|
||||
|
||||
// flush persists the in-memory dirty trie node into the disk if the configured
|
||||
// memory threshold is reached. Note, all data must be written atomically.
|
||||
flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error
|
||||
|
||||
// setSize sets the buffer size to the provided number, and invokes a flush
|
||||
// operation if the current memory usage exceeds the new limit.
|
||||
setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error
|
||||
|
||||
// reset cleans up the disk cache.
|
||||
reset()
|
||||
|
||||
// empty returns an indicator if trienodebuffer contains any state transition inside.
|
||||
empty() bool
|
||||
|
||||
// getSize return the trienodebuffer used size.
|
||||
getSize() (uint64, uint64)
|
||||
|
||||
// getAllNodes return all the trie nodes are cached in trienodebuffer.
|
||||
getAllNodes() map[common.Hash]map[string]*trienode.Node
|
||||
|
||||
// getLayers return the size of cached difflayers.
|
||||
getLayers() uint64
|
||||
}
|
||||
|
||||
func NewTrieNodeBuffer(sync bool, limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) trienodebuffer {
|
||||
if sync {
|
||||
log.Info("new sync node buffer", "limit", common.StorageSize(limit), "layers", layers)
|
||||
return newNodeBuffer(limit, nodes, layers)
|
||||
}
|
||||
log.Info("new async node buffer", "limit", common.StorageSize(limit), "layers", layers)
|
||||
return newAsyncNodeBuffer(limit, nodes, layers)
|
||||
}
|
||||
|
||||
// diskLayer is a low level persistent layer built on top of a key-value store.
|
||||
type diskLayer struct {
|
||||
root common.Hash // Immutable, root hash to which this layer was made for
|
||||
id uint64 // Immutable, corresponding state id
|
||||
db *Database // Path-based trie database
|
||||
cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs
|
||||
buffer *nodebuffer // Node buffer to aggregate writes
|
||||
buffer trienodebuffer // Node buffer to aggregate writes
|
||||
stale bool // Signals that the layer became stale (state progressed)
|
||||
lock sync.RWMutex // Lock used to protect stale flag
|
||||
}
|
||||
|
||||
// newDiskLayer creates a new disk layer based on the passing arguments.
|
||||
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer {
|
||||
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer trienodebuffer) *diskLayer {
|
||||
// Initialize a clean cache if the memory allowance is not zero
|
||||
// or reuse the provided cache if it is not nil (inherited from
|
||||
// the original disk layer).
|
||||
@ -282,7 +334,7 @@ func (dl *diskLayer) revert(h *history, loader triestate.TrieLoader) (*diskLayer
|
||||
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil
|
||||
}
|
||||
|
||||
// setBufferSize sets the node buffer size to the provided value.
|
||||
// setBufferSize sets the trie node buffer size to the provided value.
|
||||
func (dl *diskLayer) setBufferSize(size int) error {
|
||||
dl.lock.RLock()
|
||||
defer dl.lock.RUnlock()
|
||||
@ -294,14 +346,15 @@ func (dl *diskLayer) setBufferSize(size int) error {
|
||||
}
|
||||
|
||||
// size returns the approximate size of cached nodes in the disk layer.
|
||||
func (dl *diskLayer) size() common.StorageSize {
|
||||
func (dl *diskLayer) size() (common.StorageSize, common.StorageSize) {
|
||||
dl.lock.RLock()
|
||||
defer dl.lock.RUnlock()
|
||||
|
||||
if dl.stale {
|
||||
return 0
|
||||
return 0, 0
|
||||
}
|
||||
return common.StorageSize(dl.buffer.size)
|
||||
dirtyNodes, dirtyimmutableNodes := dl.buffer.getSize()
|
||||
return common.StorageSize(dirtyNodes), common.StorageSize(dirtyimmutableNodes)
|
||||
}
|
||||
|
||||
// resetCache releases the memory held by clean cache to prevent memory leak.
|
||||
|
@ -45,6 +45,20 @@ var (
|
||||
// errUnexpectedNode is returned if the requested node with specified path is
|
||||
// not hash matched with expectation.
|
||||
errUnexpectedNode = errors.New("unexpected node")
|
||||
|
||||
// errWriteImmutable is returned if write to background immutable nodecache
|
||||
// under asyncnodebuffer
|
||||
errWriteImmutable = errors.New("write immutable nodecache")
|
||||
|
||||
// errFlushMutable is returned if flush the background mutable nodecache
|
||||
// to disk, under asyncnodebuffer
|
||||
errFlushMutable = errors.New("flush mutable nodecache")
|
||||
|
||||
// errIncompatibleMerge is returned when merge node cache occurs error.
|
||||
errIncompatibleMerge = errors.New("incompatible nodecache merge")
|
||||
|
||||
// errRevertImmutable is returned if revert the background immutable nodecache
|
||||
errRevertImmutable = errors.New("revert immutable nodecache")
|
||||
)
|
||||
|
||||
func newUnexpectedNodeError(loc string, expHash common.Hash, gotHash common.Hash, owner common.Hash, path []byte, blob []byte) error {
|
||||
|
@ -130,7 +130,7 @@ func (db *Database) loadLayers() layer {
|
||||
log.Info("Failed to load journal, discard it", "err", err)
|
||||
}
|
||||
// Return single layer with persistent state.
|
||||
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0))
|
||||
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, NewTrieNodeBuffer(db.config.SyncFlush, db.bufferSize, nil, 0))
|
||||
}
|
||||
|
||||
// loadDiskLayer reads the binary blob from the layer journal, reconstructing
|
||||
@ -170,7 +170,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
|
||||
nodes[entry.Owner] = subset
|
||||
}
|
||||
// Calculate the internal state transitions by id difference.
|
||||
base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored))
|
||||
base := newDiskLayer(root, id, db, nil, NewTrieNodeBuffer(db.config.SyncFlush, db.bufferSize, nodes, id-stored))
|
||||
return base, nil
|
||||
}
|
||||
|
||||
@ -260,8 +260,9 @@ func (dl *diskLayer) journal(w io.Writer) error {
|
||||
return err
|
||||
}
|
||||
// Step three, write all unwritten nodes into the journal
|
||||
nodes := make([]journalNodes, 0, len(dl.buffer.nodes))
|
||||
for owner, subset := range dl.buffer.nodes {
|
||||
bufferNodes := dl.buffer.getAllNodes()
|
||||
nodes := make([]journalNodes, 0, len(bufferNodes))
|
||||
for owner, subset := range bufferNodes {
|
||||
entry := journalNodes{Owner: owner}
|
||||
for path, node := range subset {
|
||||
entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob})
|
||||
@ -271,7 +272,7 @@ func (dl *diskLayer) journal(w io.Writer) error {
|
||||
if err := rlp.Encode(w, nodes); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(dl.buffer.nodes))
|
||||
log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(bufferNodes))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -344,9 +345,9 @@ func (db *Database) Journal(root common.Hash) error {
|
||||
}
|
||||
disk := db.tree.bottom()
|
||||
if l, ok := l.(*diffLayer); ok {
|
||||
log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers)
|
||||
log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.getLayers())
|
||||
} else { // disk layer only on noop runs (likely) or deep reorgs (unlikely)
|
||||
log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers)
|
||||
log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.getLayers())
|
||||
}
|
||||
start := time.Now()
|
||||
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/trie/trienode"
|
||||
"github.com/ethereum/go-ethereum/trie/triestate"
|
||||
)
|
||||
@ -212,3 +213,46 @@ func (tree *layerTree) bottom() *diskLayer {
|
||||
}
|
||||
return current.(*diskLayer)
|
||||
}
|
||||
|
||||
// front return the top non-fork difflayer/disklayer root hash for rewinding.
|
||||
func (tree *layerTree) front() common.Hash {
|
||||
tree.lock.RLock()
|
||||
defer tree.lock.RUnlock()
|
||||
|
||||
chain := make(map[common.Hash][]common.Hash)
|
||||
var base common.Hash
|
||||
for _, layer := range tree.layers {
|
||||
switch dl := layer.(type) {
|
||||
case *diskLayer:
|
||||
if dl.stale {
|
||||
log.Info("pathdb top disklayer is stale")
|
||||
return base
|
||||
}
|
||||
base = dl.rootHash()
|
||||
case *diffLayer:
|
||||
if _, ok := chain[dl.parentLayer().rootHash()]; !ok {
|
||||
chain[dl.parentLayer().rootHash()] = make([]common.Hash, 0)
|
||||
}
|
||||
chain[dl.parentLayer().rootHash()] = append(chain[dl.parentLayer().rootHash()], dl.rootHash())
|
||||
default:
|
||||
log.Crit("unsupported layer type")
|
||||
}
|
||||
}
|
||||
if (base == common.Hash{}) {
|
||||
log.Info("pathdb top difflayer is empty")
|
||||
return base
|
||||
}
|
||||
parent := base
|
||||
for {
|
||||
children, ok := chain[parent]
|
||||
if !ok {
|
||||
log.Info("pathdb top difflayer", "root", parent)
|
||||
return parent
|
||||
}
|
||||
if len(children) != 1 {
|
||||
log.Info("pathdb top difflayer is forked", "common ancestor root", parent)
|
||||
return parent
|
||||
}
|
||||
parent = children[0]
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,8 @@ import (
|
||||
"github.com/ethereum/go-ethereum/trie/trienode"
|
||||
)
|
||||
|
||||
var _ trienodebuffer = &nodebuffer{}
|
||||
|
||||
// nodebuffer is a collection of modified trie nodes to aggregate the disk
|
||||
// write. The content of the nodebuffer must be checked before diving into
|
||||
// disk (since it basically is not-yet-written data).
|
||||
@ -80,7 +82,7 @@ func (b *nodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*tr
|
||||
// the ownership of the nodes map which belongs to the bottom-most diff layer.
|
||||
// It will just hold the node references from the given map which are safe to
|
||||
// copy.
|
||||
func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) *nodebuffer {
|
||||
func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer {
|
||||
var (
|
||||
delta int64
|
||||
overwrite int64
|
||||
@ -97,14 +99,14 @@ func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) *no
|
||||
current = make(map[string]*trienode.Node)
|
||||
for path, n := range subset {
|
||||
current[path] = n
|
||||
delta += int64(len(n.Blob) + len(path))
|
||||
delta += int64(len(n.Blob) + len(path) + len(owner))
|
||||
}
|
||||
b.nodes[owner] = current
|
||||
continue
|
||||
}
|
||||
for path, n := range subset {
|
||||
if orig, exist := current[path]; !exist {
|
||||
delta += int64(len(n.Blob) + len(path))
|
||||
delta += int64(len(n.Blob) + len(path) + len(owner))
|
||||
} else {
|
||||
delta += int64(len(n.Blob) - len(orig.Blob))
|
||||
overwrite++
|
||||
@ -217,7 +219,11 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
|
||||
}
|
||||
var (
|
||||
start = time.Now()
|
||||
batch = db.NewBatchWithSize(int(b.size))
|
||||
// Although the calculation of b.size has been as accurate as possible,
|
||||
// some omissions were still found during testing and code review, but
|
||||
// we are still not sure it is completely accurate. For better protection,
|
||||
// some redundancy is added here.
|
||||
batch = db.NewBatchWithSize(int(float64(b.size) * DefaultBatchRedundancyRate))
|
||||
)
|
||||
nodes := writeNodes(batch, b.nodes, clean)
|
||||
rawdb.WritePersistentStateID(batch, id)
|
||||
@ -273,3 +279,18 @@ func cacheKey(owner common.Hash, path []byte) []byte {
|
||||
}
|
||||
return append(owner.Bytes(), path...)
|
||||
}
|
||||
|
||||
// getSize return the nodebuffer used size.
|
||||
func (b *nodebuffer) getSize() (uint64, uint64) {
|
||||
return b.size, 0
|
||||
}
|
||||
|
||||
// getAllNodes return all the trie nodes are cached in nodebuffer.
|
||||
func (b *nodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node {
|
||||
return b.nodes
|
||||
}
|
||||
|
||||
// getLayers return the size of cached difflayers.
|
||||
func (b *nodebuffer) getLayers() uint64 {
|
||||
return b.layers
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user