core, eth, trie: filter out boundary nodes and remove dangling nodes in stacktrie (#28327)

* core, eth, trie: filter out boundary nodes in stacktrie

* eth/protocol/snap: add comments

* Update trie/stacktrie.go

Co-authored-by: Martin Holst Swende <martin@swende.se>

* eth, trie: remove onBoundary callback

* eth/protocols/snap: keep complete boundary nodes

* eth/protocols/snap: skip healing if the storage trie is already complete

* eth, trie: add more metrics

* eth, trie: address comment

---------

Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
rjl493456442 2023-10-23 23:31:56 +08:00 committed by GitHub
parent 43e6a3c196
commit ab04aeb855
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 312 additions and 11 deletions

@ -26,4 +26,32 @@ var (
IngressRegistrationErrorMeter = metrics.NewRegisteredMeter(ingressRegistrationErrorName, nil)
EgressRegistrationErrorMeter = metrics.NewRegisteredMeter(egressRegistrationErrorName, nil)
// deletionGauge is the metric to track how many trie node deletions
// are performed in total during the sync process.
deletionGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/delete", nil)
// lookupGauge is the metric to track how many trie node lookups are
// performed to determine if node needs to be deleted.
lookupGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/lookup", nil)
// boundaryAccountNodesGauge is the metric to track how many boundary trie
// nodes in account trie are met.
boundaryAccountNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/account", nil)
// boundaryAccountNodesGauge is the metric to track how many boundary trie
// nodes in storage tries are met.
boundaryStorageNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/storage", nil)
// smallStorageGauge is the metric to track how many storages are small enough
// to retrieved in one or two request.
smallStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/small", nil)
// largeStorageGauge is the metric to track how many storages are large enough
// to retrieved concurrently.
largeStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/large", nil)
// skipStorageHealingGauge is the metric to track how many storages are retrieved
// in multiple requests but healing is not necessary.
skipStorageHealingGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/noheal", nil)
)

@ -716,6 +716,19 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
}
}
// cleanPath is used to remove the dangling nodes in the stackTrie.
func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) {
if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db, path) {
rawdb.DeleteAccountTrieNode(batch, path)
deletionGauge.Inc(1)
}
if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db, owner, path) {
rawdb.DeleteStorageTrieNode(batch, owner, path)
deletionGauge.Inc(1)
}
lookupGauge.Inc(1)
}
// loadSyncStatus retrieves a previously aborted sync status from the database,
// or generates a fresh one if none is available.
func (s *Syncer) loadSyncStatus() {
@ -742,6 +755,17 @@ func (s *Syncer) loadSyncStatus() {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(task.genBatch, common.Hash{}, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(task.genBatch, common.Hash{}, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(task.Next != (common.Hash{}), task.Last != common.MaxHash, boundaryAccountNodesGauge)
}
task.genTrie = trie.NewStackTrie(options)
for accountHash, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
@ -758,6 +782,17 @@ func (s *Syncer) loadSyncStatus() {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(subtask.genBatch, owner, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(subtask.Next != common.Hash{}, subtask.Last != common.MaxHash, boundaryStorageNodesGauge)
}
subtask.genTrie = trie.NewStackTrie(options)
}
}
@ -814,6 +849,17 @@ func (s *Syncer) loadSyncStatus() {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, common.Hash{}, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(next != common.Hash{}, last != common.MaxHash, boundaryAccountNodesGauge)
}
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
@ -1968,6 +2014,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
res.mainTask.needState[j] = false
res.mainTask.pend--
smallStorageGauge.Inc(1)
}
// If the last contract was chunked, mark it as needing healing
// to avoid writing it out to disk prematurely.
@ -2003,7 +2050,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
}
r := newHashRange(lastKey, chunks)
if chunks == 1 {
smallStorageGauge.Inc(1)
} else {
largeStorageGauge.Inc(1)
}
// Our first task is the one that was just filled by this response.
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
@ -2016,6 +2067,14 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, owner, path)
})
// Keep the left boundary as it's the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(false, r.End() != common.MaxHash, boundaryStorageNodesGauge)
}
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
@ -2034,6 +2093,17 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, owner, path)
})
// Skip the left boundary as it's not the first range
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(true, r.End() != common.MaxHash, boundaryStorageNodesGauge)
}
tasks = append(tasks, &storageTask{
Next: r.Start(),
Last: r.End(),
@ -2089,6 +2159,17 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, account, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner only in the context of the
// path scheme. Deletion is forbidden in the hash scheme, as it can
// disrupt state completeness.
//
// Notably, boundary nodes can be also kept because the whole storage
// trie is complete.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, account, path)
})
}
tr := trie.NewStackTrie(options)
for j := 0; j < len(res.hashes[i]); j++ {
tr.Update(res.hashes[i][j][:], res.slots[i][j])
@ -2112,16 +2193,24 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if res.subTask != nil {
if res.subTask.done {
root := res.subTask.genTrie.Commit()
if root == res.subTask.root {
// If the chunk's root is an overflown but full delivery, clear the heal request
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()
// If the chunk's root is an overflown but full delivery,
// clear the heal request.
accountHash := res.accounts[len(res.accounts)-1]
if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db, accountHash, nil, root) {
for i, account := range res.mainTask.res.hashes {
if account == res.accounts[len(res.accounts)-1] {
if account == accountHash {
res.mainTask.needHeal[i] = false
skipStorageHealingGauge.Inc(1)
}
}
}
}
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done {
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize {
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}

@ -17,11 +17,13 @@
package trie
import (
"bytes"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
var (
@ -31,7 +33,12 @@ var (
// StackTrieOptions contains the configured options for manipulating the stackTrie.
type StackTrieOptions struct {
Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes
Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes
Cleaner func(path []byte) // The function to clean up dangling nodes
SkipLeftBoundary bool // Flag whether the nodes on the left boundary are skipped for committing
SkipRightBoundary bool // Flag whether the nodes on the right boundary are skipped for committing
boundaryGauge metrics.Gauge // Gauge to track how many boundary nodes are met
}
// NewStackTrieOptions initializes an empty options for stackTrie.
@ -43,6 +50,22 @@ func (o *StackTrieOptions) WithWriter(writer func(path []byte, hash common.Hash,
return o
}
// WithCleaner configures the cleaner in the option for removing dangling nodes.
func (o *StackTrieOptions) WithCleaner(cleaner func(path []byte)) *StackTrieOptions {
o.Cleaner = cleaner
return o
}
// WithSkipBoundary configures whether the left and right boundary nodes are
// filtered for committing, along with a gauge metrics to track how many
// boundary nodes are met.
func (o *StackTrieOptions) WithSkipBoundary(skipLeft, skipRight bool, gauge metrics.Gauge) *StackTrieOptions {
o.SkipLeftBoundary = skipLeft
o.SkipRightBoundary = skipRight
o.boundaryGauge = gauge
return o
}
// StackTrie is a trie implementation that expects keys to be inserted
// in order. Once it determines that a subtree will no longer be inserted
// into, it will hash it and free up the memory it uses.
@ -50,6 +73,9 @@ type StackTrie struct {
options *StackTrieOptions
root *stNode
h *hasher
first []byte // The (hex-encoded without terminator) key of first inserted entry, tracked as left boundary.
last []byte // The (hex-encoded without terminator) key of last inserted entry, tracked as right boundary.
}
// NewStackTrie allocates and initializes an empty trie.
@ -72,6 +98,15 @@ func (t *StackTrie) Update(key, value []byte) error {
}
k = k[:len(k)-1] // chop the termination flag
// track the first and last inserted entries.
if t.first == nil {
t.first = append([]byte{}, k...)
}
if t.last == nil {
t.last = append([]byte{}, k...) // allocate key slice
} else {
t.last = append(t.last[:0], k...) // reuse key slice
}
t.insert(t.root, k, value, nil)
return nil
}
@ -88,6 +123,8 @@ func (t *StackTrie) MustUpdate(key, value []byte) {
func (t *StackTrie) Reset() {
t.options = NewStackTrieOptions()
t.root = stPool.Get().(*stNode)
t.first = nil
t.last = nil
}
// stNode represents a node within a StackTrie
@ -306,8 +343,10 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, path []byte) {
//
// This method also sets 'st.type' to hashedNode, and clears 'st.key'.
func (t *StackTrie) hash(st *stNode, path []byte) {
var blob []byte // RLP-encoded node blob
var (
blob []byte // RLP-encoded node blob
internal [][]byte // List of node paths covered by the extension node
)
switch st.typ {
case hashedNode:
return
@ -342,6 +381,15 @@ func (t *StackTrie) hash(st *stNode, path []byte) {
// recursively hash and commit child as the first step
t.hash(st.children[0], append(path, st.key...))
// Collect the path of internal nodes between shortNode and its **in disk**
// child. This is essential in the case of path mode scheme to avoid leaving
// danging nodes within the range of this internal path on disk, which would
// break the guarantee for state healing.
if len(st.children[0].val) >= 32 && t.options.Cleaner != nil {
for i := 1; i < len(st.key); i++ {
internal = append(internal, append(path, st.key[:i]...))
}
}
// encode the extension node
n := shortNode{Key: hexToCompactInPlace(st.key)}
if len(st.children[0].val) < 32 {
@ -378,10 +426,33 @@ func (t *StackTrie) hash(st *stNode, path []byte) {
// input values.
st.val = t.h.hashData(blob)
// Commit the trie node if the writer is configured.
if t.options.Writer != nil {
t.options.Writer(path, common.BytesToHash(st.val), blob)
// Short circuit if the stack trie is not configured for writing.
if t.options.Writer == nil {
return
}
// Skip committing if the node is on the left boundary and stackTrie is
// configured to filter the boundary.
if t.options.SkipLeftBoundary && bytes.HasPrefix(t.first, path) {
if t.options.boundaryGauge != nil {
t.options.boundaryGauge.Inc(1)
}
return
}
// Skip committing if the node is on the right boundary and stackTrie is
// configured to filter the boundary.
if t.options.SkipRightBoundary && bytes.HasPrefix(t.last, path) {
if t.options.boundaryGauge != nil {
t.options.boundaryGauge.Inc(1)
}
return
}
// Clean up the internal dangling nodes covered by the extension node.
// This should be done before writing the node to adhere to the committing
// order from bottom to top.
for _, path := range internal {
t.options.Cleaner(path)
}
t.options.Writer(path, common.BytesToHash(st.val), blob)
}
// Hash will firstly hash the entire trie if it's still not hashed and then commit

@ -19,11 +19,14 @@ package trie
import (
"bytes"
"math/big"
"math/rand"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/trie/testutil"
"golang.org/x/exp/slices"
)
func TestStackTrieInsertAndHash(t *testing.T) {
@ -376,3 +379,87 @@ func TestStacktrieNotModifyValues(t *testing.T) {
}
}
}
func buildPartialTree(entries []*kv, t *testing.T) map[string]common.Hash {
var (
options = NewStackTrieOptions()
nodes = make(map[string]common.Hash)
)
var (
first int
last = len(entries) - 1
noLeft bool
noRight bool
)
// Enter split mode if there are at least two elements
if rand.Intn(5) != 0 {
for {
first = rand.Intn(len(entries))
last = rand.Intn(len(entries))
if first <= last {
break
}
}
if first != 0 {
noLeft = true
}
if last != len(entries)-1 {
noRight = true
}
}
options = options.WithSkipBoundary(noLeft, noRight, nil)
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
nodes[string(path)] = hash
})
tr := NewStackTrie(options)
for i := first; i <= last; i++ {
tr.MustUpdate(entries[i].k, entries[i].v)
}
tr.Commit()
return nodes
}
func TestPartialStackTrie(t *testing.T) {
for round := 0; round < 100; round++ {
var (
n = rand.Intn(100) + 1
entries []*kv
)
for i := 0; i < n; i++ {
var val []byte
if rand.Intn(3) == 0 {
val = testutil.RandBytes(3)
} else {
val = testutil.RandBytes(32)
}
entries = append(entries, &kv{
k: testutil.RandBytes(32),
v: val,
})
}
slices.SortFunc(entries, (*kv).cmp)
var (
nodes = make(map[string]common.Hash)
options = NewStackTrieOptions().WithWriter(func(path []byte, hash common.Hash, blob []byte) {
nodes[string(path)] = hash
})
)
tr := NewStackTrie(options)
for i := 0; i < len(entries); i++ {
tr.MustUpdate(entries[i].k, entries[i].v)
}
tr.Commit()
for j := 0; j < 100; j++ {
for path, hash := range buildPartialTree(entries, t) {
if nodes[path] != hash {
t.Errorf("%v, want %x, got %x", []byte(path), nodes[path], hash)
}
}
}
}
}

@ -51,6 +51,18 @@ var (
// lookupGauge is the metric to track how many trie node lookups are
// performed to determine if node needs to be deleted.
lookupGauge = metrics.NewRegisteredGauge("trie/sync/lookup", nil)
// accountNodeSyncedGauge is the metric to track how many account trie
// node are written during the sync.
accountNodeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/nodes/account", nil)
// storageNodeSyncedGauge is the metric to track how many account trie
// node are written during the sync.
storageNodeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/nodes/storage", nil)
// codeSyncedGauge is the metric to track how many contract codes are
// written during the sync.
codeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/codes", nil)
)
// SyncPath is a path tuple identifying a particular trie node either in a single
@ -362,10 +374,22 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error {
// storage, returning any occurred error.
func (s *Sync) Commit(dbw ethdb.Batch) error {
// Flush the pending node writes into database batch.
var (
account int
storage int
)
for path, value := range s.membatch.nodes {
owner, inner := ResolvePath([]byte(path))
if owner == (common.Hash{}) {
account += 1
} else {
storage += 1
}
rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme)
}
accountNodeSyncedGauge.Inc(int64(account))
storageNodeSyncedGauge.Inc(int64(storage))
// Flush the pending node deletes into the database batch.
// Please note that each written and deleted node has a
// unique path, ensuring no duplication occurs.
@ -377,6 +401,8 @@ func (s *Sync) Commit(dbw ethdb.Batch) error {
for hash, value := range s.membatch.codes {
rawdb.WriteCode(dbw, hash, value)
}
codeSyncedGauge.Inc(int64(len(s.membatch.codes)))
s.membatch = newSyncMemBatch() // reset the batch
return nil
}