bsc/trie/hbss2pbss.go

244 lines
7.3 KiB
Go
Raw Permalink Normal View History

package trie
import (
"bytes"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie/trienode"
)
type Hbss2Pbss struct {
trie *Trie // traverse trie
db Database
blocknum uint64
root node // root of triedb
stateRootHash common.Hash
concurrentQueue chan struct{}
totalNum uint64
wg sync.WaitGroup
}
const (
DEFAULT_TRIEDBCACHE_SIZE = 1024 * 1024 * 1024
)
// NewHbss2Pbss return a hash2Path obj
func NewHbss2Pbss(tr *Trie, db Database, stateRootHash common.Hash, blocknum uint64, jobnum uint64) (*Hbss2Pbss, error) {
if tr == nil {
return nil, errors.New("trie is nil")
}
if tr.root == nil {
return nil, errors.New("trie root is nil")
}
ins := &Hbss2Pbss{
trie: tr,
blocknum: blocknum,
db: db,
stateRootHash: stateRootHash,
root: tr.root,
concurrentQueue: make(chan struct{}, jobnum),
wg: sync.WaitGroup{},
}
return ins, nil
}
func (t *Trie) resloveWithoutTrack(n node, prefix []byte) (node, error) {
if n, ok := n.(hashNode); ok {
blob, err := t.reader.node(prefix, common.BytesToHash(n))
if err != nil {
return nil, err
}
return mustDecodeNode(n, blob), nil
}
return n, nil
}
func (h2p *Hbss2Pbss) writeNode(pathKey []byte, n *trienode.Node, owner common.Hash) {
if owner == (common.Hash{}) {
rawdb.WriteAccountTrieNode(h2p.db.DiskDB(), pathKey, n.Blob)
log.Debug("WriteNodes account node, ", "path: ", common.Bytes2Hex(pathKey), "Hash: ", n.Hash, "BlobHash: ", crypto.Keccak256Hash(n.Blob))
} else {
rawdb.WriteStorageTrieNode(h2p.db.DiskDB(), owner, pathKey, n.Blob)
log.Debug("WriteNodes storage node, ", "path: ", common.Bytes2Hex(pathKey), "owner: ", owner.String(), "Hash: ", n.Hash, "BlobHash: ", crypto.Keccak256Hash(n.Blob))
}
}
// Run statistics, external call
func (h2p *Hbss2Pbss) Run() {
log.Debug("Find Account Trie Tree, rootHash: ", h2p.trie.Hash().String(), "BlockNum: ", h2p.blocknum)
h2p.ConcurrentTraversal(h2p.trie, h2p.root, []byte{})
h2p.wg.Wait()
2023-09-27 15:01:49 +08:00
log.Info("Total", "complete", h2p.totalNum, "go routines Num", runtime.NumGoroutine, "h2p concurrentQueue", len(h2p.concurrentQueue))
rawdb.WritePersistentStateID(h2p.db.DiskDB(), h2p.blocknum)
rawdb.WriteStateID(h2p.db.DiskDB(), h2p.stateRootHash, h2p.blocknum)
}
func (h2p *Hbss2Pbss) SubConcurrentTraversal(theTrie *Trie, theNode node, path []byte) {
h2p.concurrentQueue <- struct{}{}
h2p.ConcurrentTraversal(theTrie, theNode, path)
<-h2p.concurrentQueue
h2p.wg.Done()
}
func (h2p *Hbss2Pbss) ConcurrentTraversal(theTrie *Trie, theNode node, path []byte) {
total_num := uint64(0)
// nil node
if theNode == nil {
return
}
switch current := (theNode).(type) {
case *shortNode:
collapsed := current.copy()
collapsed.Key = hexToCompact(current.Key)
var hash, _ = current.cache()
h2p.writeNode(path, trienode.New(common.BytesToHash(hash), nodeToBytes(collapsed)), theTrie.owner)
h2p.ConcurrentTraversal(theTrie, current.Val, append(path, current.Key...))
case *fullNode:
// copy from trie/Committer (*committer).commit
collapsed := current.copy()
var hash, _ = collapsed.cache()
collapsed.Children = h2p.commitChildren(path, current)
nodebytes := nodeToBytes(collapsed)
if common.BytesToHash(hash) != common.BytesToHash(crypto.Keccak256(nodebytes)) {
log.Error("Hash is inconsistent, hash: ", common.BytesToHash(hash), "node hash: ", common.BytesToHash(crypto.Keccak256(nodebytes)), "node: ", collapsed.fstring(""))
panic("hash inconsistent.")
}
h2p.writeNode(path, trienode.New(common.BytesToHash(hash), nodeToBytes(collapsed)), theTrie.owner)
for idx, child := range current.Children {
if child == nil {
continue
}
childPath := append(path, byte(idx))
if len(h2p.concurrentQueue)*2 < cap(h2p.concurrentQueue) {
h2p.wg.Add(1)
dst := make([]byte, len(childPath))
copy(dst, childPath)
go h2p.SubConcurrentTraversal(theTrie, child, dst)
} else {
h2p.ConcurrentTraversal(theTrie, child, childPath)
}
}
case hashNode:
n, err := theTrie.resloveWithoutTrack(current, path)
if err != nil {
log.Error("Resolve HashNode", "error", err, "TrieRoot", theTrie.Hash(), "Path", path)
return
}
h2p.ConcurrentTraversal(theTrie, n, path)
total_num = atomic.AddUint64(&h2p.totalNum, 1)
if total_num%100000 == 0 {
log.Info("Converting ", "Complete progress", total_num, "go routines Num", runtime.NumGoroutine(), "h2p concurrentQueue", len(h2p.concurrentQueue))
}
return
case valueNode:
if !hasTerm(path) {
log.Info("ValueNode miss path term", "path", common.Bytes2Hex(path))
break
}
var account types.StateAccount
if err := rlp.Decode(bytes.NewReader(current), &account); err != nil {
// log.Info("Rlp decode account failed.", "err", err)
break
}
if account.Root == (common.Hash{}) || account.Root == types.EmptyRootHash {
// log.Info("Not a storage trie.", "account", common.BytesToHash(path).String())
break
}
ownerAddress := common.BytesToHash(hexToCompact(path))
tr, err := New(StorageTrieID(h2p.stateRootHash, ownerAddress, account.Root), h2p.db)
if err != nil {
log.Error("New Storage trie error", "err", err, "root", account.Root.String(), "owner", ownerAddress.String())
break
}
log.Debug("Find Contract Trie Tree", "rootHash: ", tr.Hash().String(), "")
h2p.wg.Add(1)
go h2p.SubConcurrentTraversal(tr, tr.root, []byte{})
default:
panic(errors.New("Invalid node type to traverse."))
}
}
2024-05-09 16:05:46 +08:00
// copy from trie/Committer (*committer).commit
func (h2p *Hbss2Pbss) commitChildren(path []byte, n *fullNode) [17]node {
var children [17]node
for i := 0; i < 16; i++ {
child := n.Children[i]
if child == nil {
continue
}
// If it's the hashed child, save the hash value directly.
// Note: it's impossible that the child in range [0, 15]
// is a valueNode.
if hn, ok := child.(hashNode); ok {
children[i] = hn
continue
}
children[i] = h2p.commit(append(path, byte(i)), child)
}
// For the 17th child, it's possible the type is valuenode.
if n.Children[16] != nil {
children[16] = n.Children[16]
}
return children
}
// commit collapses a node down into a hash node and returns it.
func (h2p *Hbss2Pbss) commit(path []byte, n node) node {
// if this path is clean, use available cached data
hash, dirty := n.cache()
if hash != nil && !dirty {
return hash
}
// Commit children, then parent, and remove the dirty flag.
switch cn := n.(type) {
case *shortNode:
// Commit child
collapsed := cn.copy()
// If the child is fullNode, recursively commit,
// otherwise it can only be hashNode or valueNode.
if _, ok := cn.Val.(*fullNode); ok {
collapsed.Val = h2p.commit(append(path, cn.Key...), cn.Val)
}
// The key needs to be copied, since we're adding it to the
// modified nodeset.
collapsed.Key = hexToCompact(cn.Key)
return collapsed
case *fullNode:
hashedKids := h2p.commitChildren(path, cn)
collapsed := cn.copy()
collapsed.Children = hashedKids
return collapsed
case hashNode:
return cn
default:
// nil, valuenode shouldn't be committed
panic(fmt.Sprintf("%T: invalid node: %v", n, n))
}
}