diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index 9752d6fd8..b9fb37bf7 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -36,6 +36,7 @@ func (c Config) MarshalTOML() (interface{}, error) { TransactionHistory uint64 `toml:",omitempty"` StateHistory uint64 `toml:",omitempty"` StateScheme string `toml:",omitempty"` + PathSyncFlush bool `toml:",omitempty"` RequiredBlocks map[uint64]common.Hash `toml:"-"` LightServ int `toml:",omitempty"` LightIngress int `toml:",omitempty"` @@ -91,6 +92,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.TransactionHistory = c.TransactionHistory enc.StateHistory = c.StateHistory enc.StateScheme = c.StateScheme + enc.PathSyncFlush = c.PathSyncFlush enc.RequiredBlocks = c.RequiredBlocks enc.LightServ = c.LightServ enc.LightIngress = c.LightIngress @@ -150,6 +152,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { TransactionHistory *uint64 `toml:",omitempty"` StateHistory *uint64 `toml:",omitempty"` StateScheme *string `toml:",omitempty"` + PathSyncFlush *bool `toml:",omitempty"` RequiredBlocks map[uint64]common.Hash `toml:"-"` LightServ *int `toml:",omitempty"` LightIngress *int `toml:",omitempty"` @@ -246,6 +249,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.StateScheme != nil { c.StateScheme = *dec.StateScheme } + if dec.PathSyncFlush != nil { + c.PathSyncFlush = *dec.PathSyncFlush + } if dec.RequiredBlocks != nil { c.RequiredBlocks = dec.RequiredBlocks } diff --git a/trie/triedb/pathdb/asyncnodebuffer.go b/trie/triedb/pathdb/asyncnodebuffer.go index c8c392117..5efb46a91 100644 --- a/trie/triedb/pathdb/asyncnodebuffer.go +++ b/trie/triedb/pathdb/asyncnodebuffer.go @@ -8,6 +8,7 @@ import ( "time" "github.com/VictoriaMetrics/fastcache" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" @@ -18,12 +19,14 @@ import ( var _ trienodebuffer = &asyncnodebuffer{} -// asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache +// asyncnodebuffer implement trienodebuffer interface, and async the nodecache // to disk. type asyncnodebuffer struct { - mux sync.RWMutex - current *nodecache - background *nodecache + mux sync.RWMutex + current *nodecache + background *nodecache + isFlushing atomic.Bool + stopFlushing atomic.Bool } // newAsyncNodeBuffer initializes the async node buffer with the provided nodes. @@ -112,24 +115,21 @@ func (a *asyncnodebuffer) empty() bool { return a.current.empty() && a.background.empty() } -// setSize sets the buffer size to the provided number, and invokes a flush -// operation if the current memory usage exceeds the new limit. -//func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { -// b.limit = uint64(size) -// return b.flush(db, clean, id, false) -//} - // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { a.mux.Lock() defer a.mux.Unlock() + if a.stopFlushing.Load() { + return nil + } + if force { for { if atomic.LoadUint64(&a.background.immutable) == 1 { time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second) - log.Info("waiting background memory table flush to disk for force flush node buffer") + log.Info("waiting background memory table flushed into disk for forcing flush node buffer") continue } atomic.StoreUint64(&a.current.immutable, 1) @@ -149,26 +149,36 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, atomic.StoreUint64(&a.current.immutable, 1) a.current, a.background = a.background, a.current - go func(persistId uint64) { + a.isFlushing.Store(true) + go func(persistID uint64) { + defer a.isFlushing.Store(false) for { - err := a.background.flush(db, clean, persistId) + err := a.background.flush(db, clean, persistID) if err == nil { - log.Debug("succeed to flush background nodecahce to disk", "state_id", persistId) + log.Debug("succeed to flush background nodecache to disk", "state_id", persistID) return } - log.Error("failed to flush background nodecahce to disk", "state_id", persistId, "error", err) + log.Error("failed to flush background nodecache to disk", "state_id", persistID, "error", err) } }(id) return nil } +func (a *asyncnodebuffer) waitAndStopFlushing() { + a.stopFlushing.Store(true) + for a.isFlushing.Load() { + time.Sleep(time.Second) + log.Warn("waiting background memory table flushed into disk") + } +} + func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node { a.mux.Lock() defer a.mux.Unlock() cached, err := a.current.merge(a.background) if err != nil { - log.Crit("[BUG] failed to merge nodecache under revert asyncnodebuffer", "error", err) + log.Crit("[BUG] failed to merge node cache under revert async node buffer", "error", err) } return cached.nodes } @@ -226,6 +236,7 @@ func (nc *nodecache) commit(nodes map[common.Hash]map[string]*trienode.Node) err if atomic.LoadUint64(&nc.immutable) == 1 { return errWriteImmutable } + var ( delta int64 overwrite int64 @@ -325,12 +336,12 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) { if nc == nil || nc.empty() { res := copyNodeCache(nc1) atomic.StoreUint64(&res.immutable, 0) - return nc1, nil + return res, nil } if nc1 == nil || nc1.empty() { res := copyNodeCache(nc) atomic.StoreUint64(&res.immutable, 0) - return nc, nil + return res, nil } if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) { return nil, errIncompatibleMerge @@ -350,7 +361,7 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) { } res.size = immutable.size + mutable.size res.layers = immutable.layers + mutable.layers - res.limit = immutable.size + res.limit = immutable.limit res.nodes = make(map[common.Hash]map[string]*trienode.Node) for acc, subTree := range immutable.nodes { if _, ok := res.nodes[acc]; !ok { diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index 59be04c74..d25a65e51 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -72,6 +72,9 @@ type trienodebuffer interface { // getLayers return the size of cached difflayers. getLayers() uint64 + + // waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk. + waitAndStopFlushing() } func NewTrieNodeBuffer(sync bool, limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) trienodebuffer { diff --git a/trie/triedb/pathdb/journal.go b/trie/triedb/pathdb/journal.go index 3c7884cdf..bb4594747 100644 --- a/trie/triedb/pathdb/journal.go +++ b/trie/triedb/pathdb/journal.go @@ -242,7 +242,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) { } // journal implements the layer interface, marshaling the un-flushed trie nodes -// along with layer meta data into provided byte buffer. +// along with layer metadata into provided byte buffer. func (dl *diskLayer) journal(w io.Writer) error { dl.lock.RLock() defer dl.lock.RUnlock() @@ -338,6 +338,10 @@ func (dl *diffLayer) journal(w io.Writer) error { // flattening everything down (bad for reorgs). And this function will mark the // database as read-only to prevent all following mutation to disk. func (db *Database) Journal(root common.Hash) error { + // Run the journaling + db.lock.Lock() + defer db.lock.Unlock() + // Retrieve the head layer to journal from. l := db.tree.get(root) if l == nil { @@ -351,10 +355,8 @@ func (db *Database) Journal(root common.Hash) error { } start := time.Now() - // Run the journaling - db.lock.Lock() - defer db.lock.Unlock() - + // wait and stop the flush trienodebuffer, for asyncnodebuffer need fixed diskroot + disk.buffer.waitAndStopFlushing() // Short circuit if the database is in read only mode. if db.readOnly { return errSnapshotReadOnly diff --git a/trie/triedb/pathdb/nodebuffer.go b/trie/triedb/pathdb/nodebuffer.go index 9d79df37f..a978d559b 100644 --- a/trie/triedb/pathdb/nodebuffer.go +++ b/trie/triedb/pathdb/nodebuffer.go @@ -221,7 +221,7 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui start = time.Now() // Although the calculation of b.size has been as accurate as possible, // some omissions were still found during testing and code review, but - // we are still not sure it is completely accurate. For better protection, + // we are still not sure if it is completely accurate. For better protection, // some redundancy is added here. batch = db.NewBatchWithSize(int(float64(b.size) * DefaultBatchRedundancyRate)) ) @@ -241,6 +241,8 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui return nil } +func (b *nodebuffer) waitAndStopFlushing() {} + // writeNodes writes the trie nodes into the provided database batch. // Note this function will also inject all the newly written nodes // into clean cache.