bsc/triedb/pathdb/asyncnodebuffer.go

460 lines
13 KiB
Go

package pathdb
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie/trienode"
)
var _ trienodebuffer = &asyncnodebuffer{}
// asyncnodebuffer implement trienodebuffer interface, and async the nodecache
// to disk.
type asyncnodebuffer struct {
mux sync.RWMutex
current *nodecache
background *nodecache
isFlushing atomic.Bool
stopFlushing atomic.Bool
}
// newAsyncNodeBuffer initializes the async node buffer with the provided nodes.
func newAsyncNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *asyncnodebuffer {
if nodes == nil {
nodes = make(map[common.Hash]map[string]*trienode.Node)
}
var size uint64
for _, subset := range nodes {
for path, n := range subset {
size += uint64(len(n.Blob) + len(path))
}
}
return &asyncnodebuffer{
current: newNodeCache(uint64(limit), size, nodes, layers),
background: newNodeCache(uint64(limit), 0, make(map[common.Hash]map[string]*trienode.Node), 0),
}
}
// node retrieves the trie node with given node info.
func (a *asyncnodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) {
a.mux.RLock()
defer a.mux.RUnlock()
node, err := a.current.node(owner, path, hash)
if err != nil {
return nil, err
}
if node == nil {
return a.background.node(owner, path, hash)
}
return node, nil
}
// commit merges the dirty nodes into the nodebuffer. This operation won't take
// the ownership of the nodes map which belongs to the bottom-most diff layer.
// It will just hold the node references from the given map which are safe to
// copy.
func (a *asyncnodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer {
a.mux.Lock()
defer a.mux.Unlock()
err := a.current.commit(nodes)
if err != nil {
log.Crit("[BUG] Failed to commit nodes to asyncnodebuffer", "error", err)
}
return a
}
// revert is the reverse operation of commit. It also merges the provided nodes
// into the nodebuffer, the difference is that the provided node set should
// revert the changes made by the last state transition.
func (a *asyncnodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error {
a.mux.Lock()
defer a.mux.Unlock()
var err error
a.current, err = a.current.merge(a.background)
if err != nil {
log.Crit("[BUG] Failed to merge node cache under revert async node buffer", "error", err)
}
a.background.reset()
return a.current.revert(db, nodes)
}
// setSize is unsupported in asyncnodebuffer, due to the double buffer, blocking will occur.
func (a *asyncnodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
return errors.New("not supported")
}
// reset cleans up the disk cache.
func (a *asyncnodebuffer) reset() {
a.mux.Lock()
defer a.mux.Unlock()
a.current.reset()
a.background.reset()
}
// empty returns an indicator if nodebuffer contains any state transition inside.
func (a *asyncnodebuffer) empty() bool {
a.mux.RLock()
defer a.mux.RUnlock()
return a.current.empty() && a.background.empty()
}
// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
a.mux.Lock()
defer a.mux.Unlock()
if a.stopFlushing.Load() {
return nil
}
if force {
for {
if atomic.LoadUint64(&a.background.immutable) == 1 {
time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second)
log.Info("Waiting background memory table flushed into disk for forcing flush node buffer")
continue
}
atomic.StoreUint64(&a.current.immutable, 1)
return a.current.flush(db, clean, id)
}
}
if a.current.size < a.current.limit {
return nil
}
// background flush doing
if atomic.LoadUint64(&a.background.immutable) == 1 {
return nil
}
atomic.StoreUint64(&a.current.immutable, 1)
a.current, a.background = a.background, a.current
a.isFlushing.Store(true)
go func(persistID uint64) {
defer a.isFlushing.Store(false)
for {
err := a.background.flush(db, clean, persistID)
if err == nil {
log.Debug("Succeed to flush background nodecache to disk", "state_id", persistID)
return
}
log.Error("Failed to flush background nodecache to disk", "state_id", persistID, "error", err)
}
}(id)
return nil
}
func (a *asyncnodebuffer) waitAndStopFlushing() {
a.stopFlushing.Store(true)
for a.isFlushing.Load() {
time.Sleep(time.Second)
log.Warn("Waiting background memory table flushed into disk")
}
}
func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node {
a.mux.Lock()
defer a.mux.Unlock()
cached, err := a.current.merge(a.background)
if err != nil {
log.Crit("[BUG] Failed to merge node cache under revert async node buffer", "error", err)
}
return cached.nodes
}
func (a *asyncnodebuffer) getLayers() uint64 {
a.mux.RLock()
defer a.mux.RUnlock()
return a.current.layers + a.background.layers
}
func (a *asyncnodebuffer) getSize() (uint64, uint64) {
a.mux.RLock()
defer a.mux.RUnlock()
return a.current.size, a.background.size
}
type nodecache struct {
layers uint64 // The number of diff layers aggregated inside
size uint64 // The size of aggregated writes
limit uint64 // The maximum memory allowance in bytes
nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path
immutable uint64 // The flag equal 1, flush nodes to disk background
}
func newNodeCache(limit, size uint64, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *nodecache {
return &nodecache{
layers: layers,
size: size,
limit: limit,
nodes: nodes,
immutable: 0,
}
}
func (nc *nodecache) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) {
subset, ok := nc.nodes[owner]
if !ok {
return nil, nil
}
n, ok := subset[string(path)]
if !ok {
return nil, nil
}
if n.Hash != hash {
dirtyFalseMeter.Mark(1)
log.Error("Unexpected trie node in async node buffer", "owner", owner, "path", path, "expect", hash, "got", n.Hash)
return nil, newUnexpectedNodeError("dirty", hash, n.Hash, owner, path, n.Blob)
}
return n, nil
}
func (nc *nodecache) commit(nodes map[common.Hash]map[string]*trienode.Node) error {
if atomic.LoadUint64(&nc.immutable) == 1 {
return errWriteImmutable
}
var (
delta int64
overwrite int64
overwriteSize int64
)
for owner, subset := range nodes {
current, exist := nc.nodes[owner]
if !exist {
// Allocate a new map for the subset instead of claiming it directly
// from the passed map to avoid potential concurrent map read/write.
// The nodes belong to original diff layer are still accessible even
// after merging, thus the ownership of nodes map should still belong
// to original layer and any mutation on it should be prevented.
current = make(map[string]*trienode.Node)
for path, n := range subset {
current[path] = n
delta += int64(len(n.Blob) + len(path))
}
nc.nodes[owner] = current
continue
}
for path, n := range subset {
if orig, exist := current[path]; !exist {
delta += int64(len(n.Blob) + len(path))
} else {
delta += int64(len(n.Blob) - len(orig.Blob))
overwrite++
overwriteSize += int64(len(orig.Blob) + len(path))
}
current[path] = n
}
nc.nodes[owner] = current
}
nc.updateSize(delta)
nc.layers++
gcNodesMeter.Mark(overwrite)
gcBytesMeter.Mark(overwriteSize)
return nil
}
func (nc *nodecache) updateSize(delta int64) {
size := int64(nc.size) + delta
if size >= 0 {
nc.size = uint64(size)
return
}
s := nc.size
nc.size = 0
log.Error("Invalid pathdb buffer size", "prev", common.StorageSize(s), "delta", common.StorageSize(delta))
}
func (nc *nodecache) reset() {
atomic.StoreUint64(&nc.immutable, 0)
nc.layers = 0
nc.size = 0
nc.nodes = make(map[common.Hash]map[string]*trienode.Node)
}
func (nc *nodecache) empty() bool {
return nc.layers == 0
}
func (nc *nodecache) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
if atomic.LoadUint64(&nc.immutable) != 1 {
return errFlushMutable
}
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+nc.layers != id {
return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", nc.layers, head, id)
}
var (
start = time.Now()
batch = db.NewBatchWithSize(int(float64(nc.size) * DefaultBatchRedundancyRate))
)
nodes := writeNodes(batch, nc.nodes, clean)
rawdb.WritePersistentStateID(batch, id)
// Flush all mutations in a single batch
size := batch.ValueSize()
if err := batch.Write(); err != nil {
return err
}
commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitTimeTimer.UpdateSince(start)
log.Debug("Persisted pathdb nodes", "nodes", len(nc.nodes), "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
nc.reset()
return nil
}
func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
if nc == nil && nc1 == nil {
return nil, nil
}
if nc == nil || nc.empty() {
res := copyNodeCache(nc1)
atomic.StoreUint64(&res.immutable, 0)
return res, nil
}
if nc1 == nil || nc1.empty() {
res := copyNodeCache(nc)
atomic.StoreUint64(&res.immutable, 0)
return res, nil
}
if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) {
return nil, errIncompatibleMerge
}
var (
immutable *nodecache
mutable *nodecache
res = &nodecache{}
)
if atomic.LoadUint64(&nc.immutable) == 1 {
immutable = nc
mutable = nc1
} else {
immutable = nc1
mutable = nc
}
res.size = immutable.size + mutable.size
res.layers = immutable.layers + mutable.layers
res.limit = immutable.limit
res.nodes = make(map[common.Hash]map[string]*trienode.Node)
for acc, subTree := range immutable.nodes {
if _, ok := res.nodes[acc]; !ok {
res.nodes[acc] = make(map[string]*trienode.Node)
}
for path, node := range subTree {
res.nodes[acc][path] = node
}
}
for acc, subTree := range mutable.nodes {
if _, ok := res.nodes[acc]; !ok {
res.nodes[acc] = make(map[string]*trienode.Node)
}
for path, node := range subTree {
res.nodes[acc][path] = node
}
}
return res, nil
}
func (nc *nodecache) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error {
if atomic.LoadUint64(&nc.immutable) == 1 {
return errRevertImmutable
}
// Short circuit if no embedded state transition to revert.
if nc.layers == 0 {
return errStateUnrecoverable
}
nc.layers--
// Reset the entire buffer if only a single transition left.
if nc.layers == 0 {
nc.reset()
return nil
}
var delta int64
for owner, subset := range nodes {
current, ok := nc.nodes[owner]
if !ok {
panic(fmt.Sprintf("non-existent subset (%x)", owner))
}
for path, n := range subset {
orig, ok := current[path]
if !ok {
// There is a special case in MPT that one child is removed from
// a fullNode which only has two children, and then a new child
// with different position is immediately inserted into the fullNode.
// In this case, the clean child of the fullNode will also be
// marked as dirty because of node collapse and expansion.
//
// In case of database rollback, don't panic if this "clean"
// node occurs which is not present in buffer.
var nhash common.Hash
if owner == (common.Hash{}) {
_, nhash = rawdb.ReadAccountTrieNode(db, []byte(path))
} else {
_, nhash = rawdb.ReadStorageTrieNode(db, owner, []byte(path))
}
// Ignore the clean node in the case described above.
if nhash == n.Hash {
continue
}
panic(fmt.Sprintf("non-existent node (%x %v) blob: %v", owner, path, crypto.Keccak256Hash(n.Blob).Hex()))
}
current[path] = n
delta += int64(len(n.Blob)) - int64(len(orig.Blob))
}
}
nc.updateSize(delta)
return nil
}
func copyNodeCache(n *nodecache) *nodecache {
if n == nil {
return nil
}
nc := &nodecache{
layers: n.layers,
size: n.size,
limit: n.limit,
immutable: atomic.LoadUint64(&n.immutable),
nodes: make(map[common.Hash]map[string]*trienode.Node),
}
for acc, subTree := range n.nodes {
if _, ok := nc.nodes[acc]; !ok {
nc.nodes[acc] = make(map[string]*trienode.Node)
}
for path, node := range subTree {
nc.nodes[acc][path] = node
}
}
return nc
}