core, eth, trie: filter out boundary nodes and remove dangling nodes in (#28327)
This commit is contained in:
parent
3e6c16afd3
commit
d9873bbf38
@ -26,4 +26,32 @@ var (
|
|||||||
|
|
||||||
IngressRegistrationErrorMeter = metrics.NewRegisteredMeter(ingressRegistrationErrorName, nil)
|
IngressRegistrationErrorMeter = metrics.NewRegisteredMeter(ingressRegistrationErrorName, nil)
|
||||||
EgressRegistrationErrorMeter = metrics.NewRegisteredMeter(egressRegistrationErrorName, nil)
|
EgressRegistrationErrorMeter = metrics.NewRegisteredMeter(egressRegistrationErrorName, nil)
|
||||||
|
|
||||||
|
// deletionGauge is the metric to track how many trie node deletions
|
||||||
|
// are performed in total during the sync process.
|
||||||
|
deletionGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/delete", nil)
|
||||||
|
|
||||||
|
// lookupGauge is the metric to track how many trie node lookups are
|
||||||
|
// performed to determine if node needs to be deleted.
|
||||||
|
lookupGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/lookup", nil)
|
||||||
|
|
||||||
|
// boundaryAccountNodesGauge is the metric to track how many boundary trie
|
||||||
|
// nodes in account trie are met.
|
||||||
|
boundaryAccountNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/account", nil)
|
||||||
|
|
||||||
|
// boundaryAccountNodesGauge is the metric to track how many boundary trie
|
||||||
|
// nodes in storage tries are met.
|
||||||
|
boundaryStorageNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/storage", nil)
|
||||||
|
|
||||||
|
// smallStorageGauge is the metric to track how many storages are small enough
|
||||||
|
// to retrieved in one or two request.
|
||||||
|
smallStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/small", nil)
|
||||||
|
|
||||||
|
// largeStorageGauge is the metric to track how many storages are large enough
|
||||||
|
// to retrieved concurrently.
|
||||||
|
largeStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/large", nil)
|
||||||
|
|
||||||
|
// skipStorageHealingGauge is the metric to track how many storages are retrieved
|
||||||
|
// in multiple requests but healing is not necessary.
|
||||||
|
skipStorageHealingGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/noheal", nil)
|
||||||
)
|
)
|
||||||
|
@ -717,6 +717,19 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cleanPath is used to remove the dangling nodes in the stackTrie.
|
||||||
|
func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) {
|
||||||
|
if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db, path) {
|
||||||
|
rawdb.DeleteAccountTrieNode(batch, path)
|
||||||
|
deletionGauge.Inc(1)
|
||||||
|
}
|
||||||
|
if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db, owner, path) {
|
||||||
|
rawdb.DeleteStorageTrieNode(batch, owner, path)
|
||||||
|
deletionGauge.Inc(1)
|
||||||
|
}
|
||||||
|
lookupGauge.Inc(1)
|
||||||
|
}
|
||||||
|
|
||||||
// loadSyncStatus retrieves a previously aborted sync status from the database,
|
// loadSyncStatus retrieves a previously aborted sync status from the database,
|
||||||
// or generates a fresh one if none is available.
|
// or generates a fresh one if none is available.
|
||||||
func (s *Syncer) loadSyncStatus() {
|
func (s *Syncer) loadSyncStatus() {
|
||||||
@ -743,6 +756,17 @@ func (s *Syncer) loadSyncStatus() {
|
|||||||
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
||||||
rawdb.WriteTrieNode(task.genBatch, common.Hash{}, path, hash, blob, s.scheme)
|
rawdb.WriteTrieNode(task.genBatch, common.Hash{}, path, hash, blob, s.scheme)
|
||||||
})
|
})
|
||||||
|
if s.scheme == rawdb.PathScheme {
|
||||||
|
// Configure the dangling node cleaner and also filter out boundary nodes
|
||||||
|
// only in the context of the path scheme. Deletion is forbidden in the
|
||||||
|
// hash scheme, as it can disrupt state completeness.
|
||||||
|
options = options.WithCleaner(func(path []byte) {
|
||||||
|
s.cleanPath(task.genBatch, common.Hash{}, path)
|
||||||
|
})
|
||||||
|
// Skip the left boundary if it's not the first range.
|
||||||
|
// Skip the right boundary if it's not the last range.
|
||||||
|
options = options.WithSkipBoundary(task.Next != (common.Hash{}), task.Last != common.MaxHash, boundaryAccountNodesGauge)
|
||||||
|
}
|
||||||
task.genTrie = trie.NewStackTrie(options)
|
task.genTrie = trie.NewStackTrie(options)
|
||||||
for accountHash, subtasks := range task.SubTasks {
|
for accountHash, subtasks := range task.SubTasks {
|
||||||
for _, subtask := range subtasks {
|
for _, subtask := range subtasks {
|
||||||
@ -759,6 +783,17 @@ func (s *Syncer) loadSyncStatus() {
|
|||||||
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
||||||
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, blob, s.scheme)
|
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, blob, s.scheme)
|
||||||
})
|
})
|
||||||
|
if s.scheme == rawdb.PathScheme {
|
||||||
|
// Configure the dangling node cleaner and also filter out boundary nodes
|
||||||
|
// only in the context of the path scheme. Deletion is forbidden in the
|
||||||
|
// hash scheme, as it can disrupt state completeness.
|
||||||
|
options = options.WithCleaner(func(path []byte) {
|
||||||
|
s.cleanPath(subtask.genBatch, owner, path)
|
||||||
|
})
|
||||||
|
// Skip the left boundary if it's not the first range.
|
||||||
|
// Skip the right boundary if it's not the last range.
|
||||||
|
options = options.WithSkipBoundary(subtask.Next != common.Hash{}, subtask.Last != common.MaxHash, boundaryStorageNodesGauge)
|
||||||
|
}
|
||||||
subtask.genTrie = trie.NewStackTrie(options)
|
subtask.genTrie = trie.NewStackTrie(options)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -815,6 +850,17 @@ func (s *Syncer) loadSyncStatus() {
|
|||||||
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
||||||
rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, s.scheme)
|
rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, s.scheme)
|
||||||
})
|
})
|
||||||
|
if s.scheme == rawdb.PathScheme {
|
||||||
|
// Configure the dangling node cleaner and also filter out boundary nodes
|
||||||
|
// only in the context of the path scheme. Deletion is forbidden in the
|
||||||
|
// hash scheme, as it can disrupt state completeness.
|
||||||
|
options = options.WithCleaner(func(path []byte) {
|
||||||
|
s.cleanPath(batch, common.Hash{}, path)
|
||||||
|
})
|
||||||
|
// Skip the left boundary if it's not the first range.
|
||||||
|
// Skip the right boundary if it's not the last range.
|
||||||
|
options = options.WithSkipBoundary(next != common.Hash{}, last != common.MaxHash, boundaryAccountNodesGauge)
|
||||||
|
}
|
||||||
s.tasks = append(s.tasks, &accountTask{
|
s.tasks = append(s.tasks, &accountTask{
|
||||||
Next: next,
|
Next: next,
|
||||||
Last: last,
|
Last: last,
|
||||||
@ -1972,6 +2018,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
|
if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
|
||||||
res.mainTask.needState[j] = false
|
res.mainTask.needState[j] = false
|
||||||
res.mainTask.pend--
|
res.mainTask.pend--
|
||||||
|
smallStorageGauge.Inc(1)
|
||||||
}
|
}
|
||||||
// If the last contract was chunked, mark it as needing healing
|
// If the last contract was chunked, mark it as needing healing
|
||||||
// to avoid writing it out to disk prematurely.
|
// to avoid writing it out to disk prematurely.
|
||||||
@ -2007,7 +2054,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
|
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
|
||||||
}
|
}
|
||||||
r := newHashRange(lastKey, chunks)
|
r := newHashRange(lastKey, chunks)
|
||||||
|
if chunks == 1 {
|
||||||
|
smallStorageGauge.Inc(1)
|
||||||
|
} else {
|
||||||
|
largeStorageGauge.Inc(1)
|
||||||
|
}
|
||||||
// Our first task is the one that was just filled by this response.
|
// Our first task is the one that was just filled by this response.
|
||||||
batch := ethdb.HookedBatch{
|
batch := ethdb.HookedBatch{
|
||||||
Batch: s.db.NewBatch(),
|
Batch: s.db.NewBatch(),
|
||||||
@ -2020,6 +2071,14 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
||||||
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
|
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
|
||||||
})
|
})
|
||||||
|
if s.scheme == rawdb.PathScheme {
|
||||||
|
options = options.WithCleaner(func(path []byte) {
|
||||||
|
s.cleanPath(batch, owner, path)
|
||||||
|
})
|
||||||
|
// Keep the left boundary as it's the first range.
|
||||||
|
// Skip the right boundary if it's not the last range.
|
||||||
|
options = options.WithSkipBoundary(false, r.End() != common.MaxHash, boundaryStorageNodesGauge)
|
||||||
|
}
|
||||||
tasks = append(tasks, &storageTask{
|
tasks = append(tasks, &storageTask{
|
||||||
Next: common.Hash{},
|
Next: common.Hash{},
|
||||||
Last: r.End(),
|
Last: r.End(),
|
||||||
@ -2038,6 +2097,17 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
||||||
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
|
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
|
||||||
})
|
})
|
||||||
|
if s.scheme == rawdb.PathScheme {
|
||||||
|
// Configure the dangling node cleaner and also filter out boundary nodes
|
||||||
|
// only in the context of the path scheme. Deletion is forbidden in the
|
||||||
|
// hash scheme, as it can disrupt state completeness.
|
||||||
|
options = options.WithCleaner(func(path []byte) {
|
||||||
|
s.cleanPath(batch, owner, path)
|
||||||
|
})
|
||||||
|
// Skip the left boundary as it's not the first range
|
||||||
|
// Skip the right boundary if it's not the last range.
|
||||||
|
options = options.WithSkipBoundary(true, r.End() != common.MaxHash, boundaryStorageNodesGauge)
|
||||||
|
}
|
||||||
tasks = append(tasks, &storageTask{
|
tasks = append(tasks, &storageTask{
|
||||||
Next: r.Start(),
|
Next: r.Start(),
|
||||||
Last: r.End(),
|
Last: r.End(),
|
||||||
@ -2093,6 +2163,17 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
||||||
rawdb.WriteTrieNode(batch, account, path, hash, blob, s.scheme)
|
rawdb.WriteTrieNode(batch, account, path, hash, blob, s.scheme)
|
||||||
})
|
})
|
||||||
|
if s.scheme == rawdb.PathScheme {
|
||||||
|
// Configure the dangling node cleaner only in the context of the
|
||||||
|
// path scheme. Deletion is forbidden in the hash scheme, as it can
|
||||||
|
// disrupt state completeness.
|
||||||
|
//
|
||||||
|
// Notably, boundary nodes can be also kept because the whole storage
|
||||||
|
// trie is complete.
|
||||||
|
options = options.WithCleaner(func(path []byte) {
|
||||||
|
s.cleanPath(batch, account, path)
|
||||||
|
})
|
||||||
|
}
|
||||||
tr := trie.NewStackTrie(options)
|
tr := trie.NewStackTrie(options)
|
||||||
for j := 0; j < len(res.hashes[i]); j++ {
|
for j := 0; j < len(res.hashes[i]); j++ {
|
||||||
tr.Update(res.hashes[i][j][:], res.slots[i][j])
|
tr.Update(res.hashes[i][j][:], res.slots[i][j])
|
||||||
@ -2116,16 +2197,24 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
if res.subTask != nil {
|
if res.subTask != nil {
|
||||||
if res.subTask.done {
|
if res.subTask.done {
|
||||||
root := res.subTask.genTrie.Commit()
|
root := res.subTask.genTrie.Commit()
|
||||||
if root == res.subTask.root {
|
if err := res.subTask.genBatch.Write(); err != nil {
|
||||||
// If the chunk's root is an overflown but full delivery, clear the heal request
|
log.Error("Failed to persist stack slots", "err", err)
|
||||||
|
}
|
||||||
|
res.subTask.genBatch.Reset()
|
||||||
|
|
||||||
|
// If the chunk's root is an overflown but full delivery,
|
||||||
|
// clear the heal request.
|
||||||
|
accountHash := res.accounts[len(res.accounts)-1]
|
||||||
|
if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db, accountHash, nil, root) {
|
||||||
for i, account := range res.mainTask.res.hashes {
|
for i, account := range res.mainTask.res.hashes {
|
||||||
if account == res.accounts[len(res.accounts)-1] {
|
if account == accountHash {
|
||||||
res.mainTask.needHeal[i] = false
|
res.mainTask.needHeal[i] = false
|
||||||
|
skipStorageHealingGauge.Inc(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done {
|
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize {
|
||||||
if err := res.subTask.genBatch.Write(); err != nil {
|
if err := res.subTask.genBatch.Write(); err != nil {
|
||||||
log.Error("Failed to persist stack slots", "err", err)
|
log.Error("Failed to persist stack slots", "err", err)
|
||||||
}
|
}
|
||||||
|
@ -17,11 +17,13 @@
|
|||||||
package trie
|
package trie
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -32,6 +34,11 @@ var (
|
|||||||
// StackTrieOptions contains the configured options for manipulating the stackTrie.
|
// StackTrieOptions contains the configured options for manipulating the stackTrie.
|
||||||
type StackTrieOptions struct {
|
type StackTrieOptions struct {
|
||||||
Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes
|
Writer func(path []byte, hash common.Hash, blob []byte) // The function to commit the dirty nodes
|
||||||
|
Cleaner func(path []byte) // The function to clean up dangling nodes
|
||||||
|
|
||||||
|
SkipLeftBoundary bool // Flag whether the nodes on the left boundary are skipped for committing
|
||||||
|
SkipRightBoundary bool // Flag whether the nodes on the right boundary are skipped for committing
|
||||||
|
boundaryGauge metrics.Gauge // Gauge to track how many boundary nodes are met
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStackTrieOptions initializes an empty options for stackTrie.
|
// NewStackTrieOptions initializes an empty options for stackTrie.
|
||||||
@ -43,6 +50,22 @@ func (o *StackTrieOptions) WithWriter(writer func(path []byte, hash common.Hash,
|
|||||||
return o
|
return o
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithCleaner configures the cleaner in the option for removing dangling nodes.
|
||||||
|
func (o *StackTrieOptions) WithCleaner(cleaner func(path []byte)) *StackTrieOptions {
|
||||||
|
o.Cleaner = cleaner
|
||||||
|
return o
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithSkipBoundary configures whether the left and right boundary nodes are
|
||||||
|
// filtered for committing, along with a gauge metrics to track how many
|
||||||
|
// boundary nodes are met.
|
||||||
|
func (o *StackTrieOptions) WithSkipBoundary(skipLeft, skipRight bool, gauge metrics.Gauge) *StackTrieOptions {
|
||||||
|
o.SkipLeftBoundary = skipLeft
|
||||||
|
o.SkipRightBoundary = skipRight
|
||||||
|
o.boundaryGauge = gauge
|
||||||
|
return o
|
||||||
|
}
|
||||||
|
|
||||||
// StackTrie is a trie implementation that expects keys to be inserted
|
// StackTrie is a trie implementation that expects keys to be inserted
|
||||||
// in order. Once it determines that a subtree will no longer be inserted
|
// in order. Once it determines that a subtree will no longer be inserted
|
||||||
// into, it will hash it and free up the memory it uses.
|
// into, it will hash it and free up the memory it uses.
|
||||||
@ -50,6 +73,9 @@ type StackTrie struct {
|
|||||||
options *StackTrieOptions
|
options *StackTrieOptions
|
||||||
root *stNode
|
root *stNode
|
||||||
h *hasher
|
h *hasher
|
||||||
|
|
||||||
|
first []byte // The (hex-encoded without terminator) key of first inserted entry, tracked as left boundary.
|
||||||
|
last []byte // The (hex-encoded without terminator) key of last inserted entry, tracked as right boundary.
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStackTrie allocates and initializes an empty trie.
|
// NewStackTrie allocates and initializes an empty trie.
|
||||||
@ -72,6 +98,15 @@ func (t *StackTrie) Update(key, value []byte) error {
|
|||||||
}
|
}
|
||||||
k = k[:len(k)-1] // chop the termination flag
|
k = k[:len(k)-1] // chop the termination flag
|
||||||
|
|
||||||
|
// track the first and last inserted entries.
|
||||||
|
if t.first == nil {
|
||||||
|
t.first = append([]byte{}, k...)
|
||||||
|
}
|
||||||
|
if t.last == nil {
|
||||||
|
t.last = append([]byte{}, k...) // allocate key slice
|
||||||
|
} else {
|
||||||
|
t.last = append(t.last[:0], k...) // reuse key slice
|
||||||
|
}
|
||||||
t.insert(t.root, k, value, nil)
|
t.insert(t.root, k, value, nil)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -88,6 +123,8 @@ func (t *StackTrie) MustUpdate(key, value []byte) {
|
|||||||
func (t *StackTrie) Reset() {
|
func (t *StackTrie) Reset() {
|
||||||
t.options = NewStackTrieOptions()
|
t.options = NewStackTrieOptions()
|
||||||
t.root = stPool.Get().(*stNode)
|
t.root = stPool.Get().(*stNode)
|
||||||
|
t.first = nil
|
||||||
|
t.last = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// stNode represents a node within a StackTrie
|
// stNode represents a node within a StackTrie
|
||||||
@ -306,8 +343,10 @@ func (t *StackTrie) insert(st *stNode, key, value []byte, path []byte) {
|
|||||||
//
|
//
|
||||||
// This method also sets 'st.type' to hashedNode, and clears 'st.key'.
|
// This method also sets 'st.type' to hashedNode, and clears 'st.key'.
|
||||||
func (t *StackTrie) hash(st *stNode, path []byte) {
|
func (t *StackTrie) hash(st *stNode, path []byte) {
|
||||||
var blob []byte // RLP-encoded node blob
|
var (
|
||||||
|
blob []byte // RLP-encoded node blob
|
||||||
|
internal [][]byte // List of node paths covered by the extension node
|
||||||
|
)
|
||||||
switch st.typ {
|
switch st.typ {
|
||||||
case hashedNode:
|
case hashedNode:
|
||||||
return
|
return
|
||||||
@ -342,6 +381,15 @@ func (t *StackTrie) hash(st *stNode, path []byte) {
|
|||||||
// recursively hash and commit child as the first step
|
// recursively hash and commit child as the first step
|
||||||
t.hash(st.children[0], append(path, st.key...))
|
t.hash(st.children[0], append(path, st.key...))
|
||||||
|
|
||||||
|
// Collect the path of internal nodes between shortNode and its **in disk**
|
||||||
|
// child. This is essential in the case of path mode scheme to avoid leaving
|
||||||
|
// danging nodes within the range of this internal path on disk, which would
|
||||||
|
// break the guarantee for state healing.
|
||||||
|
if len(st.children[0].val) >= 32 && t.options.Cleaner != nil {
|
||||||
|
for i := 1; i < len(st.key); i++ {
|
||||||
|
internal = append(internal, append(path, st.key[:i]...))
|
||||||
|
}
|
||||||
|
}
|
||||||
// encode the extension node
|
// encode the extension node
|
||||||
n := shortNode{Key: hexToCompactInPlace(st.key)}
|
n := shortNode{Key: hexToCompactInPlace(st.key)}
|
||||||
if len(st.children[0].val) < 32 {
|
if len(st.children[0].val) < 32 {
|
||||||
@ -378,10 +426,33 @@ func (t *StackTrie) hash(st *stNode, path []byte) {
|
|||||||
// input values.
|
// input values.
|
||||||
st.val = t.h.hashData(blob)
|
st.val = t.h.hashData(blob)
|
||||||
|
|
||||||
// Commit the trie node if the writer is configured.
|
// Short circuit if the stack trie is not configured for writing.
|
||||||
if t.options.Writer != nil {
|
if t.options.Writer == nil {
|
||||||
t.options.Writer(path, common.BytesToHash(st.val), blob)
|
return
|
||||||
}
|
}
|
||||||
|
// Skip committing if the node is on the left boundary and stackTrie is
|
||||||
|
// configured to filter the boundary.
|
||||||
|
if t.options.SkipLeftBoundary && bytes.HasPrefix(t.first, path) {
|
||||||
|
if t.options.boundaryGauge != nil {
|
||||||
|
t.options.boundaryGauge.Inc(1)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Skip committing if the node is on the right boundary and stackTrie is
|
||||||
|
// configured to filter the boundary.
|
||||||
|
if t.options.SkipRightBoundary && bytes.HasPrefix(t.last, path) {
|
||||||
|
if t.options.boundaryGauge != nil {
|
||||||
|
t.options.boundaryGauge.Inc(1)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Clean up the internal dangling nodes covered by the extension node.
|
||||||
|
// This should be done before writing the node to adhere to the committing
|
||||||
|
// order from bottom to top.
|
||||||
|
for _, path := range internal {
|
||||||
|
t.options.Cleaner(path)
|
||||||
|
}
|
||||||
|
t.options.Writer(path, common.BytesToHash(st.val), blob)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hash will firstly hash the entire trie if it's still not hashed and then commit
|
// Hash will firstly hash the entire trie if it's still not hashed and then commit
|
||||||
|
@ -19,11 +19,14 @@ package trie
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
|
"github.com/ethereum/go-ethereum/trie/testutil"
|
||||||
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestStackTrieInsertAndHash(t *testing.T) {
|
func TestStackTrieInsertAndHash(t *testing.T) {
|
||||||
@ -344,3 +347,87 @@ func TestStacktrieNotModifyValues(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func buildPartialTree(entries []*kv, t *testing.T) map[string]common.Hash {
|
||||||
|
var (
|
||||||
|
options = NewStackTrieOptions()
|
||||||
|
nodes = make(map[string]common.Hash)
|
||||||
|
)
|
||||||
|
var (
|
||||||
|
first int
|
||||||
|
last = len(entries) - 1
|
||||||
|
|
||||||
|
noLeft bool
|
||||||
|
noRight bool
|
||||||
|
)
|
||||||
|
// Enter split mode if there are at least two elements
|
||||||
|
if rand.Intn(5) != 0 {
|
||||||
|
for {
|
||||||
|
first = rand.Intn(len(entries))
|
||||||
|
last = rand.Intn(len(entries))
|
||||||
|
if first <= last {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if first != 0 {
|
||||||
|
noLeft = true
|
||||||
|
}
|
||||||
|
if last != len(entries)-1 {
|
||||||
|
noRight = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
options = options.WithSkipBoundary(noLeft, noRight, nil)
|
||||||
|
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
||||||
|
nodes[string(path)] = hash
|
||||||
|
})
|
||||||
|
tr := NewStackTrie(options)
|
||||||
|
|
||||||
|
for i := first; i <= last; i++ {
|
||||||
|
tr.MustUpdate(entries[i].k, entries[i].v)
|
||||||
|
}
|
||||||
|
tr.Commit()
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPartialStackTrie(t *testing.T) {
|
||||||
|
for round := 0; round < 100; round++ {
|
||||||
|
var (
|
||||||
|
n = rand.Intn(100) + 1
|
||||||
|
entries []*kv
|
||||||
|
)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
var val []byte
|
||||||
|
if rand.Intn(3) == 0 {
|
||||||
|
val = testutil.RandBytes(3)
|
||||||
|
} else {
|
||||||
|
val = testutil.RandBytes(32)
|
||||||
|
}
|
||||||
|
entries = append(entries, &kv{
|
||||||
|
k: testutil.RandBytes(32),
|
||||||
|
v: val,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
slices.SortFunc(entries, (*kv).cmp)
|
||||||
|
|
||||||
|
var (
|
||||||
|
nodes = make(map[string]common.Hash)
|
||||||
|
options = NewStackTrieOptions().WithWriter(func(path []byte, hash common.Hash, blob []byte) {
|
||||||
|
nodes[string(path)] = hash
|
||||||
|
})
|
||||||
|
)
|
||||||
|
tr := NewStackTrie(options)
|
||||||
|
|
||||||
|
for i := 0; i < len(entries); i++ {
|
||||||
|
tr.MustUpdate(entries[i].k, entries[i].v)
|
||||||
|
}
|
||||||
|
tr.Commit()
|
||||||
|
|
||||||
|
for j := 0; j < 100; j++ {
|
||||||
|
for path, hash := range buildPartialTree(entries, t) {
|
||||||
|
if nodes[path] != hash {
|
||||||
|
t.Errorf("%v, want %x, got %x", []byte(path), nodes[path], hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
26
trie/sync.go
26
trie/sync.go
@ -51,6 +51,18 @@ var (
|
|||||||
// lookupGauge is the metric to track how many trie node lookups are
|
// lookupGauge is the metric to track how many trie node lookups are
|
||||||
// performed to determine if node needs to be deleted.
|
// performed to determine if node needs to be deleted.
|
||||||
lookupGauge = metrics.NewRegisteredGauge("trie/sync/lookup", nil)
|
lookupGauge = metrics.NewRegisteredGauge("trie/sync/lookup", nil)
|
||||||
|
|
||||||
|
// accountNodeSyncedGauge is the metric to track how many account trie
|
||||||
|
// node are written during the sync.
|
||||||
|
accountNodeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/nodes/account", nil)
|
||||||
|
|
||||||
|
// storageNodeSyncedGauge is the metric to track how many account trie
|
||||||
|
// node are written during the sync.
|
||||||
|
storageNodeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/nodes/storage", nil)
|
||||||
|
|
||||||
|
// codeSyncedGauge is the metric to track how many contract codes are
|
||||||
|
// written during the sync.
|
||||||
|
codeSyncedGauge = metrics.NewRegisteredGauge("trie/sync/codes", nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
// SyncPath is a path tuple identifying a particular trie node either in a single
|
// SyncPath is a path tuple identifying a particular trie node either in a single
|
||||||
@ -362,10 +374,22 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error {
|
|||||||
// storage, returning any occurred error.
|
// storage, returning any occurred error.
|
||||||
func (s *Sync) Commit(dbw ethdb.Batch) error {
|
func (s *Sync) Commit(dbw ethdb.Batch) error {
|
||||||
// Flush the pending node writes into database batch.
|
// Flush the pending node writes into database batch.
|
||||||
|
var (
|
||||||
|
account int
|
||||||
|
storage int
|
||||||
|
)
|
||||||
for path, value := range s.membatch.nodes {
|
for path, value := range s.membatch.nodes {
|
||||||
owner, inner := ResolvePath([]byte(path))
|
owner, inner := ResolvePath([]byte(path))
|
||||||
|
if owner == (common.Hash{}) {
|
||||||
|
account += 1
|
||||||
|
} else {
|
||||||
|
storage += 1
|
||||||
|
}
|
||||||
rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme)
|
rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme)
|
||||||
}
|
}
|
||||||
|
accountNodeSyncedGauge.Inc(int64(account))
|
||||||
|
storageNodeSyncedGauge.Inc(int64(storage))
|
||||||
|
|
||||||
// Flush the pending node deletes into the database batch.
|
// Flush the pending node deletes into the database batch.
|
||||||
// Please note that each written and deleted node has a
|
// Please note that each written and deleted node has a
|
||||||
// unique path, ensuring no duplication occurs.
|
// unique path, ensuring no duplication occurs.
|
||||||
@ -377,6 +401,8 @@ func (s *Sync) Commit(dbw ethdb.Batch) error {
|
|||||||
for hash, value := range s.membatch.codes {
|
for hash, value := range s.membatch.codes {
|
||||||
rawdb.WriteCode(dbw, hash, value)
|
rawdb.WriteCode(dbw, hash, value)
|
||||||
}
|
}
|
||||||
|
codeSyncedGauge.Inc(int64(len(s.membatch.codes)))
|
||||||
|
|
||||||
s.membatch = newSyncMemBatch() // reset the batch
|
s.membatch = newSyncMemBatch() // reset the batch
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user