From f4dc7530b1c63d1c223a1a339f63719d6f6c2c0d Mon Sep 17 00:00:00 2001 From: Martin HS Date: Mon, 14 Oct 2024 13:32:15 +0200 Subject: [PATCH] trie: concurrent commit (#30545) This change makes the trie commit operation concurrent, if the number of changes exceed 100. Co-authored-by: stevemilk Co-authored-by: Gary Rong --- trie/committer.go | 38 +++++++++++---- trie/trie.go | 23 +++++++--- trie/trie_test.go | 104 ++++++++++++++++++++++++++++++++++++++++++ trie/trienode/node.go | 18 ++++++++ 4 files changed, 168 insertions(+), 15 deletions(-) diff --git a/trie/committer.go b/trie/committer.go index 863e7bafdc..6c4374ccfd 100644 --- a/trie/committer.go +++ b/trie/committer.go @@ -18,6 +18,7 @@ package trie import ( "fmt" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/trie/trienode" @@ -42,12 +43,12 @@ func newCommitter(nodeset *trienode.NodeSet, tracer *tracer, collectLeaf bool) * } // Commit collapses a node down into a hash node. -func (c *committer) Commit(n node) hashNode { - return c.commit(nil, n).(hashNode) +func (c *committer) Commit(n node, parallel bool) hashNode { + return c.commit(nil, n, parallel).(hashNode) } // commit collapses a node down into a hash node and returns it. -func (c *committer) commit(path []byte, n node) node { +func (c *committer) commit(path []byte, n node, parallel bool) node { // if this path is clean, use available cached data hash, dirty := n.cache() if hash != nil && !dirty { @@ -62,7 +63,7 @@ func (c *committer) commit(path []byte, n node) node { // If the child is fullNode, recursively commit, // otherwise it can only be hashNode or valueNode. if _, ok := cn.Val.(*fullNode); ok { - collapsed.Val = c.commit(append(path, cn.Key...), cn.Val) + collapsed.Val = c.commit(append(path, cn.Key...), cn.Val, false) } // The key needs to be copied, since we're adding it to the // modified nodeset. @@ -73,7 +74,7 @@ func (c *committer) commit(path []byte, n node) node { } return collapsed case *fullNode: - hashedKids := c.commitChildren(path, cn) + hashedKids := c.commitChildren(path, cn, parallel) collapsed := cn.copy() collapsed.Children = hashedKids @@ -91,8 +92,12 @@ func (c *committer) commit(path []byte, n node) node { } // commitChildren commits the children of the given fullnode -func (c *committer) commitChildren(path []byte, n *fullNode) [17]node { - var children [17]node +func (c *committer) commitChildren(path []byte, n *fullNode, parallel bool) [17]node { + var ( + wg sync.WaitGroup + nodesMu sync.Mutex + children [17]node + ) for i := 0; i < 16; i++ { child := n.Children[i] if child == nil { @@ -108,7 +113,24 @@ func (c *committer) commitChildren(path []byte, n *fullNode) [17]node { // Commit the child recursively and store the "hashed" value. // Note the returned node can be some embedded nodes, so it's // possible the type is not hashNode. - children[i] = c.commit(append(path, byte(i)), child) + if !parallel { + children[i] = c.commit(append(path, byte(i)), child, false) + } else { + wg.Add(1) + go func(index int) { + p := append(path, byte(index)) + childSet := trienode.NewNodeSet(c.nodes.Owner) + childCommitter := newCommitter(childSet, c.tracer, c.collectLeaf) + children[index] = childCommitter.commit(p, child, false) + nodesMu.Lock() + c.nodes.MergeSet(childSet) + nodesMu.Unlock() + wg.Done() + }(i) + } + } + if parallel { + wg.Wait() } // For the 17th child, it's possible the type is valuenode. if n.Children[16] != nil { diff --git a/trie/trie.go b/trie/trie.go index 885b6b7962..372684683c 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -49,6 +49,9 @@ type Trie struct { // actually unhashed nodes. unhashed int + // uncommitted is the number of updates since last commit. + uncommitted int + // reader is the handler trie can retrieve nodes from. reader *trieReader @@ -64,12 +67,13 @@ func (t *Trie) newFlag() nodeFlag { // Copy returns a copy of Trie. func (t *Trie) Copy() *Trie { return &Trie{ - root: t.root, - owner: t.owner, - committed: t.committed, - unhashed: t.unhashed, - reader: t.reader, - tracer: t.tracer.copy(), + root: t.root, + owner: t.owner, + committed: t.committed, + reader: t.reader, + tracer: t.tracer.copy(), + uncommitted: t.uncommitted, + unhashed: t.unhashed, } } @@ -309,6 +313,7 @@ func (t *Trie) Update(key, value []byte) error { func (t *Trie) update(key, value []byte) error { t.unhashed++ + t.uncommitted++ k := keybytesToHex(key) if len(value) != 0 { _, n, err := t.insert(t.root, nil, k, valueNode(value)) @@ -422,6 +427,7 @@ func (t *Trie) Delete(key []byte) error { if t.committed { return ErrCommitted } + t.uncommitted++ t.unhashed++ k := keybytesToHex(key) _, n, err := t.delete(t.root, nil, k) @@ -642,7 +648,9 @@ func (t *Trie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) { for _, path := range t.tracer.deletedNodes() { nodes.AddNode([]byte(path), trienode.NewDeleted()) } - t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root) + // If the number of changes is below 100, we let one thread handle it + t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root, t.uncommitted > 100) + t.uncommitted = 0 return rootHash, nodes } @@ -678,6 +686,7 @@ func (t *Trie) Reset() { t.root = nil t.owner = common.Hash{} t.unhashed = 0 + t.uncommitted = 0 t.tracer.reset() t.committed = false } diff --git a/trie/trie_test.go b/trie/trie_test.go index 505b517bc5..9b2530bdd4 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -26,6 +26,7 @@ import ( "math/rand" "reflect" "sort" + "strings" "testing" "testing/quick" @@ -35,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/internal/testrand" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie/trienode" "github.com/holiman/uint256" @@ -1206,3 +1208,105 @@ func FuzzTrie(f *testing.F) { } }) } + +func BenchmarkCommit(b *testing.B) { + benchmarkCommit(b, 100) + benchmarkCommit(b, 500) + benchmarkCommit(b, 2000) + benchmarkCommit(b, 5000) +} + +func benchmarkCommit(b *testing.B, n int) { + b.Run(fmt.Sprintf("commit-%vnodes-sequential", n), func(b *testing.B) { + testCommit(b, n, false) + }) + b.Run(fmt.Sprintf("commit-%vnodes-parallel", n), func(b *testing.B) { + testCommit(b, n, true) + }) +} + +func testCommit(b *testing.B, n int, parallel bool) { + tries := make([]*Trie, b.N) + for i := 0; i < b.N; i++ { + tries[i] = NewEmpty(nil) + for j := 0; j < n; j++ { + key := testrand.Bytes(32) + val := testrand.Bytes(32) + tries[i].Update(key, val) + } + tries[i].Hash() + if !parallel { + tries[i].uncommitted = 0 + } + } + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < len(tries); i++ { + tries[i].Commit(true) + } +} + +func TestCommitCorrect(t *testing.T) { + var paraTrie = NewEmpty(nil) + var refTrie = NewEmpty(nil) + + for j := 0; j < 5000; j++ { + key := testrand.Bytes(32) + val := testrand.Bytes(32) + paraTrie.Update(key, val) + refTrie.Update(common.CopyBytes(key), common.CopyBytes(val)) + } + paraTrie.Hash() + refTrie.Hash() + refTrie.uncommitted = 0 + + haveRoot, haveNodes := paraTrie.Commit(true) + wantRoot, wantNodes := refTrie.Commit(true) + + if haveRoot != wantRoot { + t.Fatalf("have %x want %x", haveRoot, wantRoot) + } + have := printSet(haveNodes) + want := printSet(wantNodes) + if have != want { + i := 0 + for i = 0; i < len(have); i++ { + if have[i] != want[i] { + break + } + } + if i > 100 { + i -= 100 + } + t.Fatalf("have != want\nhave %q\nwant %q", have[i:], want[i:]) + } +} +func printSet(set *trienode.NodeSet) string { + var out = new(strings.Builder) + fmt.Fprintf(out, "nodeset owner: %v\n", set.Owner) + var paths []string + for k := range set.Nodes { + paths = append(paths, k) + } + sort.Strings(paths) + + for _, path := range paths { + n := set.Nodes[path] + // Deletion + if n.IsDeleted() { + fmt.Fprintf(out, " [-]: %x\n", path) + continue + } + // Insertion or update + fmt.Fprintf(out, " [+/*]: %x -> %v \n", path, n.Hash) + } + sort.Slice(set.Leaves, func(i, j int) bool { + a := set.Leaves[i] + b := set.Leaves[j] + return bytes.Compare(a.Parent[:], b.Parent[:]) < 0 + }) + for _, n := range set.Leaves { + fmt.Fprintf(out, "[leaf]: %v\n", n) + } + return out.String() +} diff --git a/trie/trienode/node.go b/trie/trienode/node.go index 09f355f3b5..7debe6ecbc 100644 --- a/trie/trienode/node.go +++ b/trie/trienode/node.go @@ -18,6 +18,7 @@ package trienode import ( "fmt" + "maps" "sort" "strings" @@ -99,6 +100,23 @@ func (set *NodeSet) AddNode(path []byte, n *Node) { set.Nodes[string(path)] = n } +// MergeSet merges this 'set' with 'other'. It assumes that the sets are disjoint, +// and thus does not deduplicate data (count deletes, dedup leaves etc). +func (set *NodeSet) MergeSet(other *NodeSet) error { + if set.Owner != other.Owner { + return fmt.Errorf("nodesets belong to different owner are not mergeable %x-%x", set.Owner, other.Owner) + } + maps.Copy(set.Nodes, other.Nodes) + + set.deletes += other.deletes + set.updates += other.updates + + // Since we assume the sets are disjoint, we can safely append leaves + // like this without deduplication. + set.Leaves = append(set.Leaves, other.Leaves...) + return nil +} + // Merge adds a set of nodes into the set. func (set *NodeSet) Merge(owner common.Hash, nodes map[string]*Node) error { if set.Owner != owner {