perf: speedup pbss trienode read (#2508)

This commit is contained in:
will-2012 2024-06-18 11:47:22 +08:00 committed by GitHub
parent f467c6018b
commit 99d31aeb28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 219 additions and 4 deletions

@ -161,12 +161,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Optimize memory distribution by reallocating surplus allowance from the // Optimize memory distribution by reallocating surplus allowance from the
// dirty cache to the clean cache. // dirty cache to the clean cache.
if config.StateScheme == rawdb.PathScheme && config.TrieDirtyCache > pathdb.MaxDirtyBufferSize/1024/1024 { if config.StateScheme == rawdb.PathScheme && config.TrieDirtyCache > pathdb.MaxDirtyBufferSize/1024/1024 {
log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024, "adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize)) log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024,
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024) "adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize))
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024,
"adjusted", common.StorageSize(config.TrieCleanCache+config.TrieDirtyCache-pathdb.MaxDirtyBufferSize/1024/1024)*1024*1024)
config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxDirtyBufferSize/1024/1024
config.TrieDirtyCache = pathdb.MaxDirtyBufferSize / 1024 / 1024 config.TrieDirtyCache = pathdb.MaxDirtyBufferSize / 1024 / 1024
} }
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) log.Info("Allocated memory caches",
"state_scheme", config.StateScheme,
"trie_clean_cache", common.StorageSize(config.TrieCleanCache)*1024*1024,
"trie_dirty_cache", common.StorageSize(config.TrieDirtyCache)*1024*1024,
"snapshot_cache", common.StorageSize(config.SnapshotCache)*1024*1024)
// Try to recover offline state pruning only in hash-based. // Try to recover offline state pruning only in hash-based.
if config.StateScheme == rawdb.HashScheme { if config.StateScheme == rawdb.HashScheme {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb, config.TriesInMemory); err != nil { if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb, config.TriesInMemory); err != nil {

@ -26,6 +26,106 @@ import (
"github.com/ethereum/go-ethereum/trie/triestate" "github.com/ethereum/go-ethereum/trie/triestate"
) )
type RefTrieNode struct {
refCount uint32
node *trienode.Node
}
type HashNodeCache struct {
lock sync.RWMutex
cache map[common.Hash]*RefTrieNode
}
func (h *HashNodeCache) length() int {
if h == nil {
return 0
}
h.lock.RLock()
defer h.lock.RUnlock()
return len(h.cache)
}
func (h *HashNodeCache) set(hash common.Hash, node *trienode.Node) {
if h == nil {
return
}
h.lock.Lock()
defer h.lock.Unlock()
if n, ok := h.cache[hash]; ok {
n.refCount++
} else {
h.cache[hash] = &RefTrieNode{1, node}
}
}
func (h *HashNodeCache) Get(hash common.Hash) *trienode.Node {
if h == nil {
return nil
}
h.lock.RLock()
defer h.lock.RUnlock()
if n, ok := h.cache[hash]; ok {
return n.node
}
return nil
}
func (h *HashNodeCache) del(hash common.Hash) {
if h == nil {
return
}
h.lock.Lock()
defer h.lock.Unlock()
n, ok := h.cache[hash]
if !ok {
return
}
if n.refCount > 0 {
n.refCount--
}
if n.refCount == 0 {
delete(h.cache, hash)
}
}
func (h *HashNodeCache) Add(ly layer) {
if h == nil {
return
}
dl, ok := ly.(*diffLayer)
if !ok {
return
}
beforeAdd := h.length()
for _, subset := range dl.nodes {
for _, node := range subset {
h.set(node.Hash, node)
}
}
diffHashCacheLengthGauge.Update(int64(h.length()))
log.Debug("Add difflayer to hash map", "root", ly.rootHash(), "block_number", dl.block, "map_len", h.length(), "add_delta", h.length()-beforeAdd)
}
func (h *HashNodeCache) Remove(ly layer) {
if h == nil {
return
}
dl, ok := ly.(*diffLayer)
if !ok {
return
}
go func() {
beforeDel := h.length()
for _, subset := range dl.nodes {
for _, node := range subset {
h.del(node.Hash)
}
}
diffHashCacheLengthGauge.Update(int64(h.length()))
log.Debug("Remove difflayer from hash map", "root", ly.rootHash(), "block_number", dl.block, "map_len", h.length(), "del_delta", beforeDel-h.length())
}()
}
// diffLayer represents a collection of modifications made to the in-memory tries // diffLayer represents a collection of modifications made to the in-memory tries
// along with associated state changes after running a block on top. // along with associated state changes after running a block on top.
// //
@ -39,7 +139,10 @@ type diffLayer struct {
nodes map[common.Hash]map[string]*trienode.Node // Cached trie nodes indexed by owner and path nodes map[common.Hash]map[string]*trienode.Node // Cached trie nodes indexed by owner and path
states *triestate.Set // Associated state change set for building history states *triestate.Set // Associated state change set for building history
memory uint64 // Approximate guess as to how much memory we use memory uint64 // Approximate guess as to how much memory we use
cache *HashNodeCache // trienode cache by hash key. cache is immutable, but cache's item can be add/del.
// mutables
origin *diskLayer // The current difflayer corresponds to the underlying disklayer and is updated during cap.
parent layer // Parent layer modified by this one, never nil, **can be changed** parent layer // Parent layer modified by this one, never nil, **can be changed**
lock sync.RWMutex // Lock used to protect parent lock sync.RWMutex // Lock used to protect parent
} }
@ -58,6 +161,20 @@ func newDiffLayer(parent layer, root common.Hash, id uint64, block uint64, nodes
states: states, states: states,
parent: parent, parent: parent,
} }
switch l := parent.(type) {
case *diskLayer:
dl.origin = l
dl.cache = &HashNodeCache{
cache: make(map[common.Hash]*RefTrieNode),
}
case *diffLayer:
dl.origin = l.originDiskLayer()
dl.cache = l.cache
default:
panic("unknown parent type")
}
for _, subset := range nodes { for _, subset := range nodes {
for path, n := range subset { for path, n := range subset {
dl.memory += uint64(n.Size() + len(path)) dl.memory += uint64(n.Size() + len(path))
@ -75,6 +192,12 @@ func newDiffLayer(parent layer, root common.Hash, id uint64, block uint64, nodes
return dl return dl
} }
func (dl *diffLayer) originDiskLayer() *diskLayer {
dl.lock.RLock()
defer dl.lock.RUnlock()
return dl.origin
}
// rootHash implements the layer interface, returning the root hash of // rootHash implements the layer interface, returning the root hash of
// corresponding state. // corresponding state.
func (dl *diffLayer) rootHash() common.Hash { func (dl *diffLayer) rootHash() common.Hash {
@ -133,6 +256,32 @@ func (dl *diffLayer) node(owner common.Hash, path []byte, hash common.Hash, dept
// Node implements the layer interface, retrieving the trie node blob with the // Node implements the layer interface, retrieving the trie node blob with the
// provided node information. No error will be returned if the node is not found. // provided node information. No error will be returned if the node is not found.
func (dl *diffLayer) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) { func (dl *diffLayer) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
if n := dl.cache.Get(hash); n != nil {
// The query from the hash map is fastpath,
// avoiding recursive query of 128 difflayers.
diffHashCacheHitMeter.Mark(1)
diffHashCacheReadMeter.Mark(int64(len(n.Blob)))
return n.Blob, nil
}
diffHashCacheMissMeter.Mark(1)
persistLayer := dl.originDiskLayer()
if persistLayer != nil {
blob, err := persistLayer.Node(owner, path, hash)
if err != nil {
// This is a bad case with a very low probability.
// r/w the difflayer cache and r/w the disklayer are not in the same lock,
// so in extreme cases, both reading the difflayer cache and reading the disklayer may fail, eg, disklayer is stale.
// In this case, fallback to the original 128-layer recursive difflayer query path.
diffHashCacheSlowPathMeter.Mark(1)
log.Debug("Retry difflayer due to query origin failed", "owner", owner, "path", path, "hash", hash.String(), "error", err)
return dl.node(owner, path, hash, 0)
} else { // This is the fastpath.
return blob, nil
}
}
diffHashCacheSlowPathMeter.Mark(1)
log.Debug("Retry difflayer due to origin is nil", "owner", owner, "path", path, "hash", hash.String())
return dl.node(owner, path, hash, 0) return dl.node(owner, path, hash, 0)
} }

@ -286,6 +286,9 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
} }
log.Debug("Pruned state history", "items", pruned, "tailid", oldest) log.Debug("Pruned state history", "items", pruned, "tailid", oldest)
} }
// The bottom has been eaten by disklayer, releasing the hash cache of bottom difflayer.
bottom.cache.Remove(bottom)
return ndl, nil return ndl, nil
} }

@ -51,9 +51,20 @@ func (tree *layerTree) reset(head layer) {
tree.lock.Lock() tree.lock.Lock()
defer tree.lock.Unlock() defer tree.lock.Unlock()
for _, ly := range tree.layers {
if dl, ok := ly.(*diffLayer); ok {
// Clean up the hash cache of difflayers due to reset.
dl.cache.Remove(dl)
}
}
var layers = make(map[common.Hash]layer) var layers = make(map[common.Hash]layer)
for head != nil { for head != nil {
layers[head.rootHash()] = head layers[head.rootHash()] = head
if dl, ok := head.(*diffLayer); ok {
// Add the hash cache of difflayers due to reset.
dl.cache.Add(dl)
}
head = head.parentLayer() head = head.parentLayer()
} }
tree.layers = layers tree.layers = layers
@ -98,12 +109,19 @@ func (tree *layerTree) add(root common.Hash, parentRoot common.Hash, block uint6
if root == parentRoot { if root == parentRoot {
return errors.New("layer cycle") return errors.New("layer cycle")
} }
if tree.get(root) != nil {
log.Info("Skip add repeated difflayer", "root", root.String(), "block_id", block)
return nil
}
parent := tree.get(parentRoot) parent := tree.get(parentRoot)
if parent == nil { if parent == nil {
return fmt.Errorf("triedb parent [%#x] layer missing", parentRoot) return fmt.Errorf("triedb parent [%#x] layer missing", parentRoot)
} }
l := parent.update(root, parent.stateID()+1, block, nodes.Flatten(), states) l := parent.update(root, parent.stateID()+1, block, nodes.Flatten(), states)
// Before adding layertree, update the hash cache.
l.cache.Add(l)
tree.lock.Lock() tree.lock.Lock()
tree.layers[l.rootHash()] = l tree.layers[l.rootHash()] = l
tree.lock.Unlock() tree.lock.Unlock()
@ -132,8 +150,15 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
if err != nil { if err != nil {
return err return err
} }
for _, ly := range tree.layers {
if dl, ok := ly.(*diffLayer); ok {
dl.cache.Remove(dl)
log.Debug("Cleanup difflayer hash cache due to cap all", "diff_root", dl.root.String(), "diff_block_number", dl.block)
}
}
// Replace the entire layer tree with the flat base // Replace the entire layer tree with the flat base
tree.layers = map[common.Hash]layer{base.rootHash(): base} tree.layers = map[common.Hash]layer{base.rootHash(): base}
log.Debug("Cap all difflayers to disklayer", "disk_root", base.rootHash().String())
return nil return nil
} }
// Dive until we run out of layers or reach the persistent database // Dive until we run out of layers or reach the persistent database
@ -146,6 +171,7 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
return nil return nil
} }
} }
var persisted *diskLayer
// We're out of layers, flatten anything below, stopping if it's the disk or if // We're out of layers, flatten anything below, stopping if it's the disk or if
// the memory limit is not yet exceeded. // the memory limit is not yet exceeded.
switch parent := diff.parentLayer().(type) { switch parent := diff.parentLayer().(type) {
@ -166,6 +192,7 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
diff.parent = base diff.parent = base
diff.lock.Unlock() diff.lock.Unlock()
persisted = base.(*diskLayer)
default: default:
panic(fmt.Sprintf("unknown data layer in triedb: %T", parent)) panic(fmt.Sprintf("unknown data layer in triedb: %T", parent))
@ -180,6 +207,13 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
} }
var remove func(root common.Hash) var remove func(root common.Hash)
remove = func(root common.Hash) { remove = func(root common.Hash) {
if df, exist := tree.layers[root]; exist {
if dl, ok := df.(*diffLayer); ok {
// Clean up the hash cache of the child difflayer corresponding to the stale parent, include the re-org case.
dl.cache.Remove(dl)
log.Debug("Cleanup difflayer hash cache due to reorg", "diff_root", dl.root.String(), "diff_block_number", dl.block)
}
}
delete(tree.layers, root) delete(tree.layers, root)
for _, child := range children[root] { for _, child := range children[root] {
remove(child) remove(child)
@ -189,8 +223,25 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
for root, layer := range tree.layers { for root, layer := range tree.layers {
if dl, ok := layer.(*diskLayer); ok && dl.isStale() { if dl, ok := layer.(*diskLayer); ok && dl.isStale() {
remove(root) remove(root)
log.Debug("Remove stale the disklayer", "disk_root", dl.root.String())
} }
} }
if persisted != nil {
var updateOriginFunc func(root common.Hash)
updateOriginFunc = func(root common.Hash) {
if diff, ok := tree.layers[root].(*diffLayer); ok {
diff.lock.Lock()
diff.origin = persisted
diff.lock.Unlock()
}
for _, child := range children[root] {
updateOriginFunc(child)
}
}
updateOriginFunc(persisted.root)
}
return nil return nil
} }

@ -47,4 +47,10 @@ var (
historyBuildTimeMeter = metrics.NewRegisteredTimer("pathdb/history/time", nil) historyBuildTimeMeter = metrics.NewRegisteredTimer("pathdb/history/time", nil)
historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil) historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil)
historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil) historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil)
diffHashCacheHitMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/hit", nil)
diffHashCacheReadMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/read", nil)
diffHashCacheMissMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/miss", nil)
diffHashCacheSlowPathMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/slowpath", nil)
diffHashCacheLengthGauge = metrics.NewRegisteredGauge("pathdb/difflayer/hashcache/size", nil)
) )