eff0bed91b
This pull request removes the `fsync` of index files in freezer.ModifyAncients function for performance gain. Originally, fsync is added after each freezer write operation to ensure the written data is truly transferred into disk. Unfortunately, it turns out `fsync` can be relatively slow, especially on macOS (see https://github.com/ethereum/go-ethereum/issues/28754 for more information). In this pull request, fsync for index file is removed as it turns out index file can be recovered even after a unclean shutdown. But fsync for data file is still kept, as we have no meaningful way to validate the data correctness after unclean shutdown. --- **But why do we need the `fsync` in the first place?** As it's necessary for freezer to survive/recover after the machine crash (e.g. power failure). In linux, whenever the file write is performed, the file metadata update and data update are not necessarily performed at the same time. Typically, the metadata will be flushed/journalled ahead of the file data. Therefore, we make the pessimistic assumption that the file is first extended with invalid "garbage" data (normally zero bytes) and that afterwards the correct data replaces the garbage. We have observed that the index file of the freezer often contain garbage entry with zero value (filenumber = 0, offset = 0) after a machine power failure. It proves that the index file is extended without the data being flushed. And this corruption can destroy the whole freezer data eventually. Performing fsync after each write operation can reduce the time window for data to be transferred to the disk and ensure the correctness of the data in the disk to the greatest extent. --- **How can we maintain this guarantee without relying on fsync?** Because the items in the index file are strictly in order, we can leverage this characteristic to detect the corruption and truncate them when freezer is opened. Specifically these validation rules are performed for each index file: For two consecutive index items: - If their file numbers are the same, then the offset of the latter one MUST not be less than that of the former. - If the file number of the latter one is equal to that of the former plus one, then the offset of the latter one MUST not be 0. - If their file numbers are not equal, and the latter's file number is not equal to the former plus 1, the latter one is valid And also, for the first non-head item, it must refer to the earliest data file, or the next file if the earliest file is not sufficient to place the first item(very special case, only theoretical possible in tests) With these validation rules, we can detect the invalid item in index file with greatest possibility. --- But unfortunately, these scenarios are not covered and could still lead to a freezer corruption if it occurs: **All items in index file are in zero value** It's impossible to distinguish if they are truly zero (e.g. all the data entries maintained in freezer are zero size) or just the garbage left by OS. In this case, these index items will be kept by truncating the entire data file, namely the freezer is corrupted. However, we can consider that the probability of this situation occurring is quite low, and even if it occurs, the freezer can be considered to be close to an empty state. Rerun the state sync should be acceptable. **Index file is integral while relative data file is corrupted** It might be possible the data file is corrupted whose file size is extended correctly with garbage filled (e.g. zero bytes). In this case, it's impossible to detect the corruption by index validation. We can either choose to `fsync` the data file, or blindly believe that if index file is integral then the data file could be integral with very high chance. In this pull request, the first option is taken.
291 lines
9.6 KiB
Go
291 lines
9.6 KiB
Go
// Copyright 2022 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package pathdb
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"maps"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/fastcache"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/trie/trienode"
|
|
)
|
|
|
|
// nodebuffer is a collection of modified trie nodes to aggregate the disk
|
|
// write. The content of the nodebuffer must be checked before diving into
|
|
// disk (since it basically is not-yet-written data).
|
|
type nodebuffer struct {
|
|
layers uint64 // The number of diff layers aggregated inside
|
|
size uint64 // The size of aggregated writes
|
|
limit uint64 // The maximum memory allowance in bytes
|
|
nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path
|
|
}
|
|
|
|
// newNodeBuffer initializes the node buffer with the provided nodes.
|
|
func newNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *nodebuffer {
|
|
if nodes == nil {
|
|
nodes = make(map[common.Hash]map[string]*trienode.Node)
|
|
}
|
|
var size uint64
|
|
for _, subset := range nodes {
|
|
for path, n := range subset {
|
|
size += uint64(len(n.Blob) + len(path))
|
|
}
|
|
}
|
|
return &nodebuffer{
|
|
layers: layers,
|
|
nodes: nodes,
|
|
size: size,
|
|
limit: uint64(limit),
|
|
}
|
|
}
|
|
|
|
// node retrieves the trie node with given node info.
|
|
func (b *nodebuffer) node(owner common.Hash, path []byte) (*trienode.Node, bool) {
|
|
subset, ok := b.nodes[owner]
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
n, ok := subset[string(path)]
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
return n, true
|
|
}
|
|
|
|
// commit merges the dirty nodes into the nodebuffer. This operation won't take
|
|
// the ownership of the nodes map which belongs to the bottom-most diff layer.
|
|
// It will just hold the node references from the given map which are safe to
|
|
// copy.
|
|
func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) *nodebuffer {
|
|
var (
|
|
delta int64
|
|
overwrite int64
|
|
overwriteSize int64
|
|
)
|
|
for owner, subset := range nodes {
|
|
current, exist := b.nodes[owner]
|
|
if !exist {
|
|
// Allocate a new map for the subset instead of claiming it directly
|
|
// from the passed map to avoid potential concurrent map read/write.
|
|
// The nodes belong to original diff layer are still accessible even
|
|
// after merging, thus the ownership of nodes map should still belong
|
|
// to original layer and any mutation on it should be prevented.
|
|
for path, n := range subset {
|
|
delta += int64(len(n.Blob) + len(path))
|
|
}
|
|
b.nodes[owner] = maps.Clone(subset)
|
|
continue
|
|
}
|
|
for path, n := range subset {
|
|
if orig, exist := current[path]; !exist {
|
|
delta += int64(len(n.Blob) + len(path))
|
|
} else {
|
|
delta += int64(len(n.Blob) - len(orig.Blob))
|
|
overwrite++
|
|
overwriteSize += int64(len(orig.Blob) + len(path))
|
|
}
|
|
current[path] = n
|
|
}
|
|
b.nodes[owner] = current
|
|
}
|
|
b.updateSize(delta)
|
|
b.layers++
|
|
gcNodesMeter.Mark(overwrite)
|
|
gcBytesMeter.Mark(overwriteSize)
|
|
return b
|
|
}
|
|
|
|
// revert is the reverse operation of commit. It also merges the provided nodes
|
|
// into the nodebuffer, the difference is that the provided node set should
|
|
// revert the changes made by the last state transition.
|
|
func (b *nodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error {
|
|
// Short circuit if no embedded state transition to revert.
|
|
if b.layers == 0 {
|
|
return errStateUnrecoverable
|
|
}
|
|
b.layers--
|
|
|
|
// Reset the entire buffer if only a single transition left.
|
|
if b.layers == 0 {
|
|
b.reset()
|
|
return nil
|
|
}
|
|
var delta int64
|
|
for owner, subset := range nodes {
|
|
current, ok := b.nodes[owner]
|
|
if !ok {
|
|
panic(fmt.Sprintf("non-existent subset (%x)", owner))
|
|
}
|
|
for path, n := range subset {
|
|
orig, ok := current[path]
|
|
if !ok {
|
|
// There is a special case in MPT that one child is removed from
|
|
// a fullNode which only has two children, and then a new child
|
|
// with different position is immediately inserted into the fullNode.
|
|
// In this case, the clean child of the fullNode will also be
|
|
// marked as dirty because of node collapse and expansion.
|
|
//
|
|
// In case of database rollback, don't panic if this "clean"
|
|
// node occurs which is not present in buffer.
|
|
var blob []byte
|
|
if owner == (common.Hash{}) {
|
|
blob = rawdb.ReadAccountTrieNode(db, []byte(path))
|
|
} else {
|
|
blob = rawdb.ReadStorageTrieNode(db, owner, []byte(path))
|
|
}
|
|
// Ignore the clean node in the case described above.
|
|
if bytes.Equal(blob, n.Blob) {
|
|
continue
|
|
}
|
|
panic(fmt.Sprintf("non-existent node (%x %v) blob: %v", owner, path, crypto.Keccak256Hash(n.Blob).Hex()))
|
|
}
|
|
current[path] = n
|
|
delta += int64(len(n.Blob)) - int64(len(orig.Blob))
|
|
}
|
|
}
|
|
b.updateSize(delta)
|
|
return nil
|
|
}
|
|
|
|
// updateSize updates the total cache size by the given delta.
|
|
func (b *nodebuffer) updateSize(delta int64) {
|
|
size := int64(b.size) + delta
|
|
if size >= 0 {
|
|
b.size = uint64(size)
|
|
return
|
|
}
|
|
s := b.size
|
|
b.size = 0
|
|
log.Error("Invalid pathdb buffer size", "prev", common.StorageSize(s), "delta", common.StorageSize(delta))
|
|
}
|
|
|
|
// reset cleans up the disk cache.
|
|
func (b *nodebuffer) reset() {
|
|
b.layers = 0
|
|
b.size = 0
|
|
b.nodes = make(map[common.Hash]map[string]*trienode.Node)
|
|
}
|
|
|
|
// empty returns an indicator if nodebuffer contains any state transition inside.
|
|
func (b *nodebuffer) empty() bool {
|
|
return b.layers == 0
|
|
}
|
|
|
|
// setSize sets the buffer size to the provided number, and invokes a flush
|
|
// operation if the current memory usage exceeds the new limit.
|
|
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, freezer ethdb.AncientStore, clean *fastcache.Cache, id uint64) error {
|
|
b.limit = uint64(size)
|
|
return b.flush(db, freezer, clean, id, false)
|
|
}
|
|
|
|
// allocBatch returns a database batch with pre-allocated buffer.
|
|
func (b *nodebuffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch {
|
|
var metasize int
|
|
for owner, nodes := range b.nodes {
|
|
if owner == (common.Hash{}) {
|
|
metasize += len(nodes) * len(rawdb.TrieNodeAccountPrefix) // database key prefix
|
|
} else {
|
|
metasize += len(nodes) * (len(rawdb.TrieNodeStoragePrefix) + common.HashLength) // database key prefix + owner
|
|
}
|
|
}
|
|
return db.NewBatchWithSize((metasize + int(b.size)) * 11 / 10) // extra 10% for potential pebble internal stuff
|
|
}
|
|
|
|
// flush persists the in-memory dirty trie node into the disk if the configured
|
|
// memory threshold is reached. Note, all data must be written atomically.
|
|
func (b *nodebuffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, clean *fastcache.Cache, id uint64, force bool) error {
|
|
if b.size <= b.limit && !force {
|
|
return nil
|
|
}
|
|
// Ensure the target state id is aligned with the internal counter.
|
|
head := rawdb.ReadPersistentStateID(db)
|
|
if head+b.layers != id {
|
|
return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
|
|
}
|
|
var (
|
|
start = time.Now()
|
|
batch = b.allocBatch(db)
|
|
)
|
|
// Explicitly sync the state freezer, ensuring that all written
|
|
// data is transferred to disk before updating the key-value store.
|
|
if freezer != nil {
|
|
if err := freezer.Sync(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
nodes := writeNodes(batch, b.nodes, clean)
|
|
rawdb.WritePersistentStateID(batch, id)
|
|
|
|
// Flush all mutations in a single batch
|
|
size := batch.ValueSize()
|
|
if err := batch.Write(); err != nil {
|
|
return err
|
|
}
|
|
commitBytesMeter.Mark(int64(size))
|
|
commitNodesMeter.Mark(int64(nodes))
|
|
commitTimeTimer.UpdateSince(start)
|
|
log.Debug("Persisted pathdb nodes", "nodes", len(b.nodes), "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
|
|
b.reset()
|
|
return nil
|
|
}
|
|
|
|
// writeNodes writes the trie nodes into the provided database batch.
|
|
// Note this function will also inject all the newly written nodes
|
|
// into clean cache.
|
|
func writeNodes(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.Node, clean *fastcache.Cache) (total int) {
|
|
for owner, subset := range nodes {
|
|
for path, n := range subset {
|
|
if n.IsDeleted() {
|
|
if owner == (common.Hash{}) {
|
|
rawdb.DeleteAccountTrieNode(batch, []byte(path))
|
|
} else {
|
|
rawdb.DeleteStorageTrieNode(batch, owner, []byte(path))
|
|
}
|
|
if clean != nil {
|
|
clean.Del(cacheKey(owner, []byte(path)))
|
|
}
|
|
} else {
|
|
if owner == (common.Hash{}) {
|
|
rawdb.WriteAccountTrieNode(batch, []byte(path), n.Blob)
|
|
} else {
|
|
rawdb.WriteStorageTrieNode(batch, owner, []byte(path), n.Blob)
|
|
}
|
|
if clean != nil {
|
|
clean.Set(cacheKey(owner, []byte(path)), n.Blob)
|
|
}
|
|
}
|
|
}
|
|
total += len(subset)
|
|
}
|
|
return total
|
|
}
|
|
|
|
// cacheKey constructs the unique key of clean cache.
|
|
func cacheKey(owner common.Hash, path []byte) []byte {
|
|
if owner == (common.Hash{}) {
|
|
return path
|
|
}
|
|
return append(owner.Bytes(), path...)
|
|
}
|