eff0bed91b
This pull request removes the `fsync` of index files in freezer.ModifyAncients function for performance gain. Originally, fsync is added after each freezer write operation to ensure the written data is truly transferred into disk. Unfortunately, it turns out `fsync` can be relatively slow, especially on macOS (see https://github.com/ethereum/go-ethereum/issues/28754 for more information). In this pull request, fsync for index file is removed as it turns out index file can be recovered even after a unclean shutdown. But fsync for data file is still kept, as we have no meaningful way to validate the data correctness after unclean shutdown. --- **But why do we need the `fsync` in the first place?** As it's necessary for freezer to survive/recover after the machine crash (e.g. power failure). In linux, whenever the file write is performed, the file metadata update and data update are not necessarily performed at the same time. Typically, the metadata will be flushed/journalled ahead of the file data. Therefore, we make the pessimistic assumption that the file is first extended with invalid "garbage" data (normally zero bytes) and that afterwards the correct data replaces the garbage. We have observed that the index file of the freezer often contain garbage entry with zero value (filenumber = 0, offset = 0) after a machine power failure. It proves that the index file is extended without the data being flushed. And this corruption can destroy the whole freezer data eventually. Performing fsync after each write operation can reduce the time window for data to be transferred to the disk and ensure the correctness of the data in the disk to the greatest extent. --- **How can we maintain this guarantee without relying on fsync?** Because the items in the index file are strictly in order, we can leverage this characteristic to detect the corruption and truncate them when freezer is opened. Specifically these validation rules are performed for each index file: For two consecutive index items: - If their file numbers are the same, then the offset of the latter one MUST not be less than that of the former. - If the file number of the latter one is equal to that of the former plus one, then the offset of the latter one MUST not be 0. - If their file numbers are not equal, and the latter's file number is not equal to the former plus 1, the latter one is valid And also, for the first non-head item, it must refer to the earliest data file, or the next file if the earliest file is not sufficient to place the first item(very special case, only theoretical possible in tests) With these validation rules, we can detect the invalid item in index file with greatest possibility. --- But unfortunately, these scenarios are not covered and could still lead to a freezer corruption if it occurs: **All items in index file are in zero value** It's impossible to distinguish if they are truly zero (e.g. all the data entries maintained in freezer are zero size) or just the garbage left by OS. In this case, these index items will be kept by truncating the entire data file, namely the freezer is corrupted. However, we can consider that the probability of this situation occurring is quite low, and even if it occurs, the freezer can be considered to be close to an empty state. Rerun the state sync should be acceptable. **Index file is integral while relative data file is corrupted** It might be possible the data file is corrupted whose file size is extended correctly with garbage filled (e.g. zero bytes). In this case, it's impossible to detect the corruption by index validation. We can either choose to `fsync` the data file, or blindly believe that if index file is integral then the data file could be integral with very high chance. In this pull request, the first option is taken.
316 lines
10 KiB
Go
316 lines
10 KiB
Go
// Copyright 2022 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package pathdb
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/fastcache"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/trie/trienode"
|
|
"github.com/ethereum/go-ethereum/trie/triestate"
|
|
)
|
|
|
|
// diskLayer is a low level persistent layer built on top of a key-value store.
|
|
type diskLayer struct {
|
|
root common.Hash // Immutable, root hash to which this layer was made for
|
|
id uint64 // Immutable, corresponding state id
|
|
db *Database // Path-based trie database
|
|
cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs
|
|
buffer *nodebuffer // Node buffer to aggregate writes
|
|
stale bool // Signals that the layer became stale (state progressed)
|
|
lock sync.RWMutex // Lock used to protect stale flag
|
|
}
|
|
|
|
// newDiskLayer creates a new disk layer based on the passing arguments.
|
|
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer {
|
|
// Initialize a clean cache if the memory allowance is not zero
|
|
// or reuse the provided cache if it is not nil (inherited from
|
|
// the original disk layer).
|
|
if cleans == nil && db.config.CleanCacheSize != 0 {
|
|
cleans = fastcache.New(db.config.CleanCacheSize)
|
|
}
|
|
return &diskLayer{
|
|
root: root,
|
|
id: id,
|
|
db: db,
|
|
cleans: cleans,
|
|
buffer: buffer,
|
|
}
|
|
}
|
|
|
|
// rootHash implements the layer interface, returning root hash of corresponding state.
|
|
func (dl *diskLayer) rootHash() common.Hash {
|
|
return dl.root
|
|
}
|
|
|
|
// stateID implements the layer interface, returning the state id of disk layer.
|
|
func (dl *diskLayer) stateID() uint64 {
|
|
return dl.id
|
|
}
|
|
|
|
// parentLayer implements the layer interface, returning nil as there's no layer
|
|
// below the disk.
|
|
func (dl *diskLayer) parentLayer() layer {
|
|
return nil
|
|
}
|
|
|
|
// isStale return whether this layer has become stale (was flattened across) or if
|
|
// it's still live.
|
|
func (dl *diskLayer) isStale() bool {
|
|
dl.lock.RLock()
|
|
defer dl.lock.RUnlock()
|
|
|
|
return dl.stale
|
|
}
|
|
|
|
// markStale sets the stale flag as true.
|
|
func (dl *diskLayer) markStale() {
|
|
dl.lock.Lock()
|
|
defer dl.lock.Unlock()
|
|
|
|
if dl.stale {
|
|
panic("triedb disk layer is stale") // we've committed into the same base from two children, boom
|
|
}
|
|
dl.stale = true
|
|
}
|
|
|
|
// node implements the layer interface, retrieving the trie node with the
|
|
// provided node info. No error will be returned if the node is not found.
|
|
func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, common.Hash, *nodeLoc, error) {
|
|
dl.lock.RLock()
|
|
defer dl.lock.RUnlock()
|
|
|
|
if dl.stale {
|
|
return nil, common.Hash{}, nil, errSnapshotStale
|
|
}
|
|
// Try to retrieve the trie node from the not-yet-written
|
|
// node buffer first. Note the buffer is lock free since
|
|
// it's impossible to mutate the buffer before tagging the
|
|
// layer as stale.
|
|
n, found := dl.buffer.node(owner, path)
|
|
if found {
|
|
dirtyHitMeter.Mark(1)
|
|
dirtyReadMeter.Mark(int64(len(n.Blob)))
|
|
dirtyNodeHitDepthHist.Update(int64(depth))
|
|
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
|
|
}
|
|
dirtyMissMeter.Mark(1)
|
|
|
|
// Try to retrieve the trie node from the clean memory cache
|
|
h := newHasher()
|
|
defer h.release()
|
|
|
|
key := cacheKey(owner, path)
|
|
if dl.cleans != nil {
|
|
if blob := dl.cleans.Get(nil, key); len(blob) > 0 {
|
|
cleanHitMeter.Mark(1)
|
|
cleanReadMeter.Mark(int64(len(blob)))
|
|
return blob, h.hash(blob), &nodeLoc{loc: locCleanCache, depth: depth}, nil
|
|
}
|
|
cleanMissMeter.Mark(1)
|
|
}
|
|
// Try to retrieve the trie node from the disk.
|
|
var blob []byte
|
|
if owner == (common.Hash{}) {
|
|
blob = rawdb.ReadAccountTrieNode(dl.db.diskdb, path)
|
|
} else {
|
|
blob = rawdb.ReadStorageTrieNode(dl.db.diskdb, owner, path)
|
|
}
|
|
if dl.cleans != nil && len(blob) > 0 {
|
|
dl.cleans.Set(key, blob)
|
|
cleanWriteMeter.Mark(int64(len(blob)))
|
|
}
|
|
return blob, h.hash(blob), &nodeLoc{loc: locDiskLayer, depth: depth}, nil
|
|
}
|
|
|
|
// update implements the layer interface, returning a new diff layer on top
|
|
// with the given state set.
|
|
func (dl *diskLayer) update(root common.Hash, id uint64, block uint64, nodes map[common.Hash]map[string]*trienode.Node, states *triestate.Set) *diffLayer {
|
|
return newDiffLayer(dl, root, id, block, nodes, states)
|
|
}
|
|
|
|
// commit merges the given bottom-most diff layer into the node buffer
|
|
// and returns a newly constructed disk layer. Note the current disk
|
|
// layer must be tagged as stale first to prevent re-access.
|
|
func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
|
|
dl.lock.Lock()
|
|
defer dl.lock.Unlock()
|
|
|
|
// Construct and store the state history first. If crash happens after storing
|
|
// the state history but without flushing the corresponding states(journal),
|
|
// the stored state history will be truncated from head in the next restart.
|
|
var (
|
|
overflow bool
|
|
oldest uint64
|
|
)
|
|
if dl.db.freezer != nil {
|
|
err := writeHistory(dl.db.freezer, bottom)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Determine if the persisted history object has exceeded the configured
|
|
// limitation, set the overflow as true if so.
|
|
tail, err := dl.db.freezer.Tail()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
limit := dl.db.config.StateHistory
|
|
if limit != 0 && bottom.stateID()-tail > limit {
|
|
overflow = true
|
|
oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation**
|
|
}
|
|
}
|
|
// Mark the diskLayer as stale before applying any mutations on top.
|
|
dl.stale = true
|
|
|
|
// Store the root->id lookup afterwards. All stored lookups are identified
|
|
// by the **unique** state root. It's impossible that in the same chain
|
|
// blocks are not adjacent but have the same root.
|
|
if dl.id == 0 {
|
|
rawdb.WriteStateID(dl.db.diskdb, dl.root, 0)
|
|
}
|
|
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())
|
|
|
|
// Construct a new disk layer by merging the nodes from the provided diff
|
|
// layer, and flush the content in disk layer if there are too many nodes
|
|
// cached. The clean cache is inherited from the original disk layer.
|
|
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes))
|
|
|
|
// In a unique scenario where the ID of the oldest history object (after tail
|
|
// truncation) surpasses the persisted state ID, we take the necessary action
|
|
// of forcibly committing the cached dirty nodes to ensure that the persisted
|
|
// state ID remains higher.
|
|
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
|
|
force = true
|
|
}
|
|
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.db.freezer, ndl.cleans, ndl.id, force); err != nil {
|
|
return nil, err
|
|
}
|
|
// To remove outdated history objects from the end, we set the 'tail' parameter
|
|
// to 'oldest-1' due to the offset between the freezer index and the history ID.
|
|
if overflow {
|
|
pruned, err := truncateFromTail(ndl.db.diskdb, ndl.db.freezer, oldest-1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
log.Debug("Pruned state history", "items", pruned, "tailid", oldest)
|
|
}
|
|
return ndl, nil
|
|
}
|
|
|
|
// revert applies the given state history and return a reverted disk layer.
|
|
func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
|
|
if h.meta.root != dl.rootHash() {
|
|
return nil, errUnexpectedHistory
|
|
}
|
|
if dl.id == 0 {
|
|
return nil, fmt.Errorf("%w: zero state id", errStateUnrecoverable)
|
|
}
|
|
// Apply the reverse state changes upon the current state. This must
|
|
// be done before holding the lock in order to access state in "this"
|
|
// layer.
|
|
nodes, err := apply(dl.db, h.meta.parent, h.meta.root, h.accounts, h.storages)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Mark the diskLayer as stale before applying any mutations on top.
|
|
dl.lock.Lock()
|
|
defer dl.lock.Unlock()
|
|
|
|
dl.stale = true
|
|
|
|
// State change may be applied to node buffer, or the persistent
|
|
// state, depends on if node buffer is empty or not. If the node
|
|
// buffer is not empty, it means that the state transition that
|
|
// needs to be reverted is not yet flushed and cached in node
|
|
// buffer, otherwise, manipulate persistent state directly.
|
|
if !dl.buffer.empty() {
|
|
err := dl.buffer.revert(dl.db.diskdb, nodes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
batch := dl.db.diskdb.NewBatch()
|
|
writeNodes(batch, nodes, dl.cleans)
|
|
rawdb.WritePersistentStateID(batch, dl.id-1)
|
|
if err := batch.Write(); err != nil {
|
|
log.Crit("Failed to write states", "err", err)
|
|
}
|
|
}
|
|
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil
|
|
}
|
|
|
|
// setBufferSize sets the node buffer size to the provided value.
|
|
func (dl *diskLayer) setBufferSize(size int) error {
|
|
dl.lock.RLock()
|
|
defer dl.lock.RUnlock()
|
|
|
|
if dl.stale {
|
|
return errSnapshotStale
|
|
}
|
|
return dl.buffer.setSize(size, dl.db.diskdb, dl.db.freezer, dl.cleans, dl.id)
|
|
}
|
|
|
|
// size returns the approximate size of cached nodes in the disk layer.
|
|
func (dl *diskLayer) size() common.StorageSize {
|
|
dl.lock.RLock()
|
|
defer dl.lock.RUnlock()
|
|
|
|
if dl.stale {
|
|
return 0
|
|
}
|
|
return common.StorageSize(dl.buffer.size)
|
|
}
|
|
|
|
// resetCache releases the memory held by clean cache to prevent memory leak.
|
|
func (dl *diskLayer) resetCache() {
|
|
dl.lock.RLock()
|
|
defer dl.lock.RUnlock()
|
|
|
|
// Stale disk layer loses the ownership of clean cache.
|
|
if dl.stale {
|
|
return
|
|
}
|
|
if dl.cleans != nil {
|
|
dl.cleans.Reset()
|
|
}
|
|
}
|
|
|
|
// hasher is used to compute the sha256 hash of the provided data.
|
|
type hasher struct{ sha crypto.KeccakState }
|
|
|
|
var hasherPool = sync.Pool{
|
|
New: func() interface{} { return &hasher{sha: crypto.NewKeccakState()} },
|
|
}
|
|
|
|
func newHasher() *hasher {
|
|
return hasherPool.Get().(*hasher)
|
|
}
|
|
|
|
func (h *hasher) hash(data []byte) common.Hash {
|
|
return crypto.HashData(h.sha, data)
|
|
}
|
|
|
|
func (h *hasher) release() {
|
|
hasherPool.Put(h)
|
|
}
|