trie: remove inconsistent trie nodes during sync in path mode (#28595)

This fixes a database corruption issue that could occur during state healing.
When sync is aborted while certain modifications were already committed, and a
reorg occurs, the database would contain incorrect trie nodes stored by path.
These nodes need to detected/deleted in order to obtain a complete and fully correct state
after state healing.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
rjl493456442 2023-12-08 21:28:23 +08:00 committed by GitHub
parent d98d70f670
commit e206d3f897
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 270 additions and 84 deletions

@ -273,9 +273,13 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) {
b.Put([]byte("5"), nil)
b.Delete([]byte("1"))
b.Put([]byte("6"), nil)
b.Delete([]byte("3"))
b.Delete([]byte("3")) // delete then put
b.Put([]byte("3"), nil)
b.Put([]byte("7"), nil) // put then delete
b.Delete([]byte("7"))
if err := b.Write(); err != nil {
t.Fatal(err)
}

@ -119,7 +119,6 @@ type nodeRequest struct {
hash common.Hash // Hash of the trie node to retrieve
path []byte // Merkle path leading to this node for prioritization
data []byte // Data content of the node, cached until all subtrees complete
deletes [][]byte // List of internal path segments for trie nodes to delete
parent *nodeRequest // Parent state node referencing this entry
deps int // Number of dependencies before allowed to commit this node
@ -146,38 +145,85 @@ type CodeSyncResult struct {
Data []byte // Data content of the retrieved bytecode
}
// nodeOp represents an operation upon the trie node. It can either represent a
// deletion to the specific node or a node write for persisting retrieved node.
type nodeOp struct {
owner common.Hash // identifier of the trie (empty for account trie)
path []byte // path from the root to the specified node.
blob []byte // the content of the node (nil for deletion)
hash common.Hash // hash of the node content (empty for node deletion)
}
// isDelete indicates if the operation is a database deletion.
func (op *nodeOp) isDelete() bool {
return len(op.blob) == 0
}
// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
// persisted data items.
type syncMemBatch struct {
nodes map[string][]byte // In-memory membatch of recently completed nodes
hashes map[string]common.Hash // Hashes of recently completed nodes
deletes map[string]struct{} // List of paths for trie node to delete
codes map[common.Hash][]byte // In-memory membatch of recently completed codes
scheme string // State scheme identifier
codes map[common.Hash][]byte // In-memory batch of recently completed codes
nodes []nodeOp // In-memory batch of recently completed/deleted nodes
size uint64 // Estimated batch-size of in-memory data.
}
// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch() *syncMemBatch {
func newSyncMemBatch(scheme string) *syncMemBatch {
return &syncMemBatch{
nodes: make(map[string][]byte),
hashes: make(map[string]common.Hash),
deletes: make(map[string]struct{}),
scheme: scheme,
codes: make(map[common.Hash][]byte),
}
}
// hasNode reports the trie node with specific path is already cached.
func (batch *syncMemBatch) hasNode(path []byte) bool {
_, ok := batch.nodes[string(path)]
return ok
}
// hasCode reports the contract code with specific hash is already cached.
func (batch *syncMemBatch) hasCode(hash common.Hash) bool {
_, ok := batch.codes[hash]
return ok
}
// addCode caches a contract code database write operation.
func (batch *syncMemBatch) addCode(hash common.Hash, code []byte) {
batch.codes[hash] = code
batch.size += common.HashLength + uint64(len(code))
}
// addNode caches a node database write operation.
func (batch *syncMemBatch) addNode(owner common.Hash, path []byte, blob []byte, hash common.Hash) {
if batch.scheme == rawdb.PathScheme {
if owner == (common.Hash{}) {
batch.size += uint64(len(path) + len(blob))
} else {
batch.size += common.HashLength + uint64(len(path)+len(blob))
}
} else {
batch.size += common.HashLength + uint64(len(blob))
}
batch.nodes = append(batch.nodes, nodeOp{
owner: owner,
path: path,
blob: blob,
hash: hash,
})
}
// delNode caches a node database delete operation.
func (batch *syncMemBatch) delNode(owner common.Hash, path []byte) {
if batch.scheme != rawdb.PathScheme {
log.Error("Unexpected node deletion", "owner", owner, "path", path, "scheme", batch.scheme)
return // deletion is not supported in hash mode.
}
if owner == (common.Hash{}) {
batch.size += uint64(len(path))
} else {
batch.size += common.HashLength + uint64(len(path))
}
batch.nodes = append(batch.nodes, nodeOp{
owner: owner,
path: path,
})
}
// Sync is the main state trie synchronisation scheduler, which provides yet
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
@ -196,7 +242,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
ts := &Sync{
scheme: scheme,
database: database,
membatch: newSyncMemBatch(),
membatch: newSyncMemBatch(scheme),
nodeReqs: make(map[string]*nodeRequest),
codeReqs: make(map[common.Hash]*codeRequest),
queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy
@ -210,16 +256,17 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
// parent for completion tracking. The given path is a unique node path in
// hex format and contain all the parent path if it's layered trie node.
func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, parentPath []byte, callback LeafCallback) {
// Short circuit if the trie is empty or already known
if root == types.EmptyRootHash {
return
}
if s.membatch.hasNode(path) {
return
}
owner, inner := ResolvePath(path)
if rawdb.HasTrieNode(s.database, owner, inner, root, s.scheme) {
exist, inconsistent := s.hasNode(owner, inner, root)
if exist {
// The entire subtrie is already present in the database.
return
} else if inconsistent {
// There is a pre-existing node with the wrong hash in DB, remove it.
s.membatch.delNode(owner, inner)
}
// Assemble the new sub-trie sync request
req := &nodeRequest{
@ -371,39 +418,42 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error {
}
// Commit flushes the data stored in the internal membatch out to persistent
// storage, returning any occurred error.
// storage, returning any occurred error. The whole data set will be flushed
// in an atomic database batch.
func (s *Sync) Commit(dbw ethdb.Batch) error {
// Flush the pending node writes into database batch.
var (
account int
storage int
)
for path, value := range s.membatch.nodes {
owner, inner := ResolvePath([]byte(path))
if owner == (common.Hash{}) {
for _, op := range s.membatch.nodes {
if op.isDelete() {
// node deletion is only supported in path mode.
if op.owner == (common.Hash{}) {
rawdb.DeleteAccountTrieNode(dbw, op.path)
} else {
rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path)
}
deletionGauge.Inc(1)
} else {
if op.owner == (common.Hash{}) {
account += 1
} else {
storage += 1
}
rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme)
rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme)
}
}
accountNodeSyncedGauge.Inc(int64(account))
storageNodeSyncedGauge.Inc(int64(storage))
// Flush the pending node deletes into the database batch.
// Please note that each written and deleted node has a
// unique path, ensuring no duplication occurs.
for path := range s.membatch.deletes {
owner, inner := ResolvePath([]byte(path))
rawdb.DeleteTrieNode(dbw, owner, inner, common.Hash{} /* unused */, s.scheme)
}
// Flush the pending code writes into database batch.
for hash, value := range s.membatch.codes {
rawdb.WriteCode(dbw, hash, value)
}
codeSyncedGauge.Inc(int64(len(s.membatch.codes)))
s.membatch = newSyncMemBatch() // reset the batch
s.membatch = newSyncMemBatch(s.scheme) // reset the batch
return nil
}
@ -476,12 +526,15 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
// child as invalid. This is essential in the case of path mode
// scheme; otherwise, state healing might overwrite existing child
// nodes silently while leaving a dangling parent node within the
// range of this internal path on disk. This would break the
// guarantee for state healing.
// range of this internal path on disk and the persistent state
// ends up with a very weird situation that nodes on the same path
// are not inconsistent while they all present in disk. This property
// would break the guarantee for state healing.
//
// While it's possible for this shortNode to overwrite a previously
// existing full node, the other branches of the fullNode can be
// retained as they remain untouched and complete.
// retained as they are not accessible with the new shortNode, and
// also the whole sub-trie is still untouched and complete.
//
// This step is only necessary for path mode, as there is no deletion
// in hash mode at all.
@ -498,8 +551,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
exists = rawdb.ExistsStorageTrieNode(s.database, owner, append(inner, key[:i]...))
}
if exists {
req.deletes = append(req.deletes, key[:i])
deletionGauge.Inc(1)
s.membatch.delNode(owner, append(inner, key[:i]...))
log.Debug("Detected dangling node", "owner", owner, "path", append(inner, key[:i]...))
}
}
@ -521,6 +573,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
var (
missing = make(chan *nodeRequest, len(children))
pending sync.WaitGroup
batchMu sync.Mutex
)
for _, child := range children {
// Notify any external watcher of a new key/value node
@ -538,34 +591,32 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
}
}
}
// If the child references another node, resolve or schedule
// If the child references another node, resolve or schedule.
// We check all children concurrently.
if node, ok := (child.node).(hashNode); ok {
// Try to resolve the node from the local database
if s.membatch.hasNode(child.path) {
continue
}
// Check the presence of children concurrently
path := child.path
hash := common.BytesToHash(node)
pending.Add(1)
go func(child childNode) {
go func() {
defer pending.Done()
// If database says duplicate, then at least the trie node is present
// and we hold the assumption that it's NOT legacy contract code.
var (
chash = common.BytesToHash(node)
owner, inner = ResolvePath(child.path)
)
if rawdb.HasTrieNode(s.database, owner, inner, chash, s.scheme) {
owner, inner := ResolvePath(path)
exist, inconsistent := s.hasNode(owner, inner, hash)
if exist {
return
} else if inconsistent {
// There is a pre-existing node with the wrong hash in DB, remove it.
batchMu.Lock()
s.membatch.delNode(owner, inner)
batchMu.Unlock()
}
// Locally unknown node, schedule for retrieval
missing <- &nodeRequest{
path: child.path,
hash: chash,
path: path,
hash: hash,
parent: req,
callback: req.callback,
}
}(child)
}()
}
}
pending.Wait()
@ -587,21 +638,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
// committed themselves.
func (s *Sync) commitNodeRequest(req *nodeRequest) error {
// Write the node content to the membatch
s.membatch.nodes[string(req.path)] = req.data
s.membatch.hashes[string(req.path)] = req.hash
owner, path := ResolvePath(req.path)
s.membatch.addNode(owner, path, req.data, req.hash)
// The size tracking refers to the db-batch, not the in-memory data.
if s.scheme == rawdb.PathScheme {
s.membatch.size += uint64(len(req.path) + len(req.data))
} else {
s.membatch.size += common.HashLength + uint64(len(req.data))
}
// Delete the internal nodes which are marked as invalid
for _, segment := range req.deletes {
path := append(req.path, segment...)
s.membatch.deletes[string(path)] = struct{}{}
s.membatch.size += uint64(len(path))
}
// Removed the completed node request
delete(s.nodeReqs, string(req.path))
s.fetches[len(req.path)]--
@ -622,8 +662,9 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error {
// committed themselves.
func (s *Sync) commitCodeRequest(req *codeRequest) error {
// Write the node content to the membatch
s.membatch.codes[req.hash] = req.data
s.membatch.size += common.HashLength + uint64(len(req.data))
s.membatch.addCode(req.hash, req.data)
// Removed the completed code request
delete(s.codeReqs, req.hash)
s.fetches[len(req.path)]--
@ -639,6 +680,28 @@ func (s *Sync) commitCodeRequest(req *codeRequest) error {
return nil
}
// hasNode reports whether the specified trie node is present in the database.
// 'exists' is true when the node exists in the database and matches the given root
// hash. The 'inconsistent' return value is true when the node exists but does not
// match the expected hash.
func (s *Sync) hasNode(owner common.Hash, path []byte, hash common.Hash) (exists bool, inconsistent bool) {
// If node is running with hash scheme, check the presence with node hash.
if s.scheme == rawdb.HashScheme {
return rawdb.HasLegacyTrieNode(s.database, hash), false
}
// If node is running with path scheme, check the presence with node path.
var blob []byte
var dbHash common.Hash
if owner == (common.Hash{}) {
blob, dbHash = rawdb.ReadAccountTrieNode(s.database, path)
} else {
blob, dbHash = rawdb.ReadStorageTrieNode(s.database, owner, path)
}
exists = hash == dbHash
inconsistent = !exists && len(blob) != 0
return exists, inconsistent
}
// ResolvePath resolves the provided composite node path by separating the
// path in account trie if it's existent.
func ResolvePath(path []byte) (common.Hash, []byte) {

@ -684,8 +684,11 @@ func testSyncOrdering(t *testing.T, scheme string) {
}
}
}
func syncWith(t *testing.T, root common.Hash, db ethdb.Database, srcDb *Database) {
syncWithHookWriter(t, root, db, srcDb, nil)
}
func syncWithHookWriter(t *testing.T, root common.Hash, db ethdb.Database, srcDb *Database, hookWriter ethdb.KeyValueWriter) {
// Create a destination trie and sync with the scheduler
sched := NewSync(root, db, nil, srcDb.Scheme())
@ -723,8 +726,11 @@ func syncWith(t *testing.T, root common.Hash, db ethdb.Database, srcDb *Database
if err := sched.Commit(batch); err != nil {
t.Fatalf("failed to commit data: %v", err)
}
if hookWriter != nil {
batch.Replay(hookWriter)
} else {
batch.Write()
}
paths, nodes, _ = sched.Missing(0)
elements = elements[:0]
for i := 0; i < len(paths); i++ {
@ -894,3 +900,116 @@ func testPivotMove(t *testing.T, scheme string, tiny bool) {
syncWith(t, rootC, destDisk, srcTrieDB)
checkTrieContents(t, destDisk, scheme, srcTrie.Hash().Bytes(), stateC, true)
}
func TestSyncAbort(t *testing.T) {
testSyncAbort(t, rawdb.PathScheme)
testSyncAbort(t, rawdb.HashScheme)
}
type hookWriter struct {
db ethdb.KeyValueStore
filter func(key []byte, value []byte) bool
}
// Put inserts the given value into the key-value data store.
func (w *hookWriter) Put(key []byte, value []byte) error {
if w.filter != nil && w.filter(key, value) {
return nil
}
return w.db.Put(key, value)
}
// Delete removes the key from the key-value data store.
func (w *hookWriter) Delete(key []byte) error {
return w.db.Delete(key)
}
func testSyncAbort(t *testing.T, scheme string) {
var (
srcDisk = rawdb.NewMemoryDatabase()
srcTrieDB = newTestDatabase(srcDisk, scheme)
srcTrie, _ = New(TrieID(types.EmptyRootHash), srcTrieDB)
deleteFn = func(key []byte, tr *Trie, states map[string][]byte) {
tr.Delete(key)
delete(states, string(key))
}
writeFn = func(key []byte, val []byte, tr *Trie, states map[string][]byte) {
if val == nil {
val = randBytes(32)
}
tr.Update(key, val)
states[string(key)] = common.CopyBytes(val)
}
copyStates = func(states map[string][]byte) map[string][]byte {
cpy := make(map[string][]byte)
for k, v := range states {
cpy[k] = v
}
return cpy
}
)
var (
stateA = make(map[string][]byte)
key = randBytes(32)
val = randBytes(32)
)
for i := 0; i < 256; i++ {
writeFn(randBytes(32), nil, srcTrie, stateA)
}
writeFn(key, val, srcTrie, stateA)
rootA, nodesA, _ := srcTrie.Commit(false)
if err := srcTrieDB.Update(rootA, types.EmptyRootHash, 0, trienode.NewWithNodeSet(nodesA), nil); err != nil {
panic(err)
}
if err := srcTrieDB.Commit(rootA, false); err != nil {
panic(err)
}
// Create a destination trie and sync with the scheduler
destDisk := rawdb.NewMemoryDatabase()
syncWith(t, rootA, destDisk, srcTrieDB)
checkTrieContents(t, destDisk, scheme, srcTrie.Hash().Bytes(), stateA, true)
// Delete the element from the trie
stateB := copyStates(stateA)
srcTrie, _ = New(TrieID(rootA), srcTrieDB)
deleteFn(key, srcTrie, stateB)
rootB, nodesB, _ := srcTrie.Commit(false)
if err := srcTrieDB.Update(rootB, rootA, 0, trienode.NewWithNodeSet(nodesB), nil); err != nil {
panic(err)
}
if err := srcTrieDB.Commit(rootB, false); err != nil {
panic(err)
}
// Sync the new state, but never persist the new root node. Before the
// fix #28595, the original old root node will still be left in database
// which breaks the next healing cycle.
syncWithHookWriter(t, rootB, destDisk, srcTrieDB, &hookWriter{db: destDisk, filter: func(key []byte, value []byte) bool {
if scheme == rawdb.HashScheme {
return false
}
if len(value) == 0 {
return false
}
ok, path := rawdb.ResolveAccountTrieNodeKey(key)
return ok && len(path) == 0
}})
// Add elements to expand trie
stateC := copyStates(stateB)
srcTrie, _ = New(TrieID(rootB), srcTrieDB)
writeFn(key, val, srcTrie, stateC)
rootC, nodesC, _ := srcTrie.Commit(false)
if err := srcTrieDB.Update(rootC, rootB, 0, trienode.NewWithNodeSet(nodesC), nil); err != nil {
panic(err)
}
if err := srcTrieDB.Commit(rootC, false); err != nil {
panic(err)
}
syncWith(t, rootC, destDisk, srcTrieDB)
checkTrieContents(t, destDisk, scheme, srcTrie.Hash().Bytes(), stateC, true)
}