cmd, core: prefetch reads too from tries if requested (#29807)

* cmd/utils, consensus/beacon, core/state: when configured via stub  flag: prefetch all reads from account/storage tries, terminate prefetcher synchronously.

* cmd, core/state: fix nil panic, fix error handling, prefetch nosnap too

* core/state: expand prefetcher metrics for reads and writes separately

* cmd/utils, eth: fix noop collect witness flag

---------

Co-authored-by: Péter Szilágyi <peterke@gmail.com>
This commit is contained in:
jwasinger 2024-06-11 01:10:07 -07:00 committed by GitHub
parent 2eb185c92b
commit 85587d5ef2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 160 additions and 57 deletions

@ -156,6 +156,7 @@ var (
utils.BeaconGenesisRootFlag,
utils.BeaconGenesisTimeFlag,
utils.BeaconCheckpointFlag,
utils.CollectWitnessFlag,
}, utils.NetworkFlags, utils.DatabaseFlags)
rpcFlags = []cli.Flag{

@ -604,6 +604,11 @@ var (
Usage: "Disables db compaction after import",
Category: flags.LoggingCategory,
}
CollectWitnessFlag = &cli.BoolFlag{
Name: "collectwitness",
Usage: "Enable state witness generation during block execution. Work in progress flag, don't use.",
Category: flags.MiscCategory,
}
// MISC settings
SyncTargetFlag = &cli.StringFlag{
@ -1760,6 +1765,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
// TODO(fjl): force-enable this in --dev mode
cfg.EnablePreimageRecording = ctx.Bool(VMEnableDebugFlag.Name)
}
if ctx.IsSet(CollectWitnessFlag.Name) {
cfg.EnableWitnessCollection = ctx.Bool(CollectWitnessFlag.Name)
}
if ctx.IsSet(RPCGlobalGasCapFlag.Name) {
cfg.RPCGasCap = ctx.Uint64(RPCGlobalGasCapFlag.Name)
@ -2190,7 +2198,10 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheGCFlag.Name) {
cache.TrieDirtyLimit = ctx.Int(CacheFlag.Name) * ctx.Int(CacheGCFlag.Name) / 100
}
vmcfg := vm.Config{EnablePreimageRecording: ctx.Bool(VMEnableDebugFlag.Name)}
vmcfg := vm.Config{
EnablePreimageRecording: ctx.Bool(VMEnableDebugFlag.Name),
EnableWitnessCollection: ctx.Bool(CollectWitnessFlag.Name),
}
if ctx.IsSet(VMTraceFlag.Name) {
if name := ctx.String(VMTraceFlag.Name); name != "" {
var config json.RawMessage

@ -1809,7 +1809,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
// while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
if bc.chainConfig.IsByzantium(block.Number()) {
statedb.StartPrefetcher("chain")
statedb.StartPrefetcher("chain", !bc.vmConfig.EnableWitnessCollection)
}
activeState = statedb

@ -230,6 +230,14 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
}
value.SetBytes(val)
}
// Independent of where we loaded the data from, add it to the prefetcher.
// Whilst this would be a bit weird if snapshots are disabled, but we still
// want the trie nodes to end up in the prefetcher too, so just push through.
if s.db.prefetcher != nil && s.data.Root != types.EmptyRootHash {
if err = s.db.prefetcher.prefetch(s.addrHash, s.origin.Root, s.address, [][]byte{key[:]}, true); err != nil {
log.Error("Failed to prefetch storage slot", "addr", s.address, "key", key, "err", err)
}
}
s.originStorage[key] = value
return value
}
@ -293,7 +301,7 @@ func (s *stateObject) finalise() {
s.pendingStorage[key] = value
}
if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch); err != nil {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch, false); err != nil {
log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err)
}
}

@ -200,14 +200,14 @@ func (s *StateDB) SetLogger(l *tracing.Hooks) {
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) {
func (s *StateDB) StartPrefetcher(namespace string, noreads bool) {
if s.prefetcher != nil {
s.prefetcher.terminate(false)
s.prefetcher.report()
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, noreads)
// With the switch to the Proof-of-Stake consensus algorithm, block production
// rewards are now handled at the consensus layer. Consequently, a block may
@ -218,7 +218,7 @@ func (s *StateDB) StartPrefetcher(namespace string) {
// To prevent this, the account trie is always scheduled for prefetching once
// the prefetcher is constructed. For more details, see:
// https://github.com/ethereum/go-ethereum/issues/29880
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil); err != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, false); err != nil {
log.Error("Failed to prefetch account trie", "root", s.originalRoot, "err", err)
}
}
@ -616,6 +616,14 @@ func (s *StateDB) getStateObject(addr common.Address) *stateObject {
return nil
}
}
// Independent of where we loaded the data from, add it to the prefetcher.
// Whilst this would be a bit weird if snapshots are disabled, but we still
// want the trie nodes to end up in the prefetcher too, so just push through.
if s.prefetcher != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, [][]byte{addr[:]}, true); err != nil {
log.Error("Failed to prefetch account", "addr", addr, "err", err)
}
}
// Insert into the live set
obj := newObject(s, addr, data)
s.setStateObject(obj)
@ -792,7 +800,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
}
if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch); err != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch, false); err != nil {
log.Error("Failed to prefetch addresses", "addresses", len(addressesToPrefetch), "err", err)
}
}

@ -44,31 +44,49 @@ type triePrefetcher struct {
root common.Hash // Root hash of the account trie for metrics
fetchers map[string]*subfetcher // Subfetchers for each trie
term chan struct{} // Channel to signal interruption
noreads bool // Whether to ignore state-read-only prefetch requests
deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
accountWasteMeter metrics.Meter
storageLoadMeter metrics.Meter
storageDupMeter metrics.Meter
storageWasteMeter metrics.Meter
accountLoadReadMeter metrics.Meter
accountLoadWriteMeter metrics.Meter
accountDupReadMeter metrics.Meter
accountDupWriteMeter metrics.Meter
accountDupCrossMeter metrics.Meter
accountWasteMeter metrics.Meter
storageLoadReadMeter metrics.Meter
storageLoadWriteMeter metrics.Meter
storageDupReadMeter metrics.Meter
storageDupWriteMeter metrics.Meter
storageDupCrossMeter metrics.Meter
storageWasteMeter metrics.Meter
}
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
return &triePrefetcher{
db: db,
root: root,
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map
term: make(chan struct{}),
noreads: noreads,
deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
accountLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/read", nil),
accountLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/write", nil),
accountDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/read", nil),
accountDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/write", nil),
accountDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/cross", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
storageLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/read", nil),
storageLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/write", nil),
storageDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/read", nil),
storageDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/write", nil),
storageDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/cross", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
}
@ -98,19 +116,31 @@ func (p *triePrefetcher) report() {
fetcher.wait() // ensure the fetcher's idle before poking in its internals
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))
p.accountDupReadMeter.Mark(int64(fetcher.dupsRead))
p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
delete(fetcher.seenRead, string(key))
delete(fetcher.seenWrite, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
p.accountWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))
p.storageDupReadMeter.Mark(int64(fetcher.dupsRead))
p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
delete(fetcher.seenRead, string(key))
delete(fetcher.seenWrite, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
p.storageWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite)))
}
}
}
@ -126,7 +156,11 @@ func (p *triePrefetcher) report() {
// upon the same contract, the parameters invoking this method may be
// repeated.
// 2. Finalize of the main account trie. This happens only once per block.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) error {
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte, read bool) error {
// If the state item is only being read, but reads are disabled, return
if read && p.noreads {
return nil
}
// Ensure the subfetcher is still alive
select {
case <-p.term:
@ -139,7 +173,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
p.fetchers[id] = fetcher
}
return fetcher.schedule(keys)
return fetcher.schedule(keys, read)
}
// trie returns the trie matching the root hash, blocking until the fetcher of
@ -186,38 +220,51 @@ type subfetcher struct {
addr common.Address // Address of the account that the trie belongs to
trie Trie // Trie being populated with nodes
tasks [][]byte // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
tasks []*subfetcherTask // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption
seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
used [][]byte // Tracks the entries used in the end
seenRead map[string]struct{} // Tracks the entries already loaded via read operations
seenWrite map[string]struct{} // Tracks the entries already loaded via write operations
dupsRead int // Number of duplicate preload tasks via reads only
dupsWrite int // Number of duplicate preload tasks via writes only
dupsCross int // Number of duplicate preload tasks via read-write-crosses
used [][]byte // Tracks the entries used in the end
}
// subfetcherTask is a trie path to prefetch, tagged with whether it originates
// from a read or a write request.
type subfetcherTask struct {
read bool
key []byte
}
// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
sf := &subfetcher{
db: db,
state: state,
owner: owner,
root: root,
addr: addr,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
seen: make(map[string]struct{}),
db: db,
state: state,
owner: owner,
root: root,
addr: addr,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
seenRead: make(map[string]struct{}),
seenWrite: make(map[string]struct{}),
}
go sf.loop()
return sf
}
// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) error {
func (sf *subfetcher) schedule(keys [][]byte, read bool) error {
// Ensure the subfetcher is still alive
select {
case <-sf.term:
@ -226,7 +273,10 @@ func (sf *subfetcher) schedule(keys [][]byte) error {
}
// Append the tasks to the current queue
sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...)
for _, key := range keys {
key := key // closure for the append below
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, key: key})
}
sf.lock.Unlock()
// Notify the background thread to execute scheduled tasks
@ -303,16 +353,36 @@ func (sf *subfetcher) loop() {
sf.lock.Unlock()
for _, task := range tasks {
if _, ok := sf.seen[string(task)]; ok {
sf.dups++
continue
}
if len(task) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task))
key := string(task.key)
if task.read {
if _, ok := sf.seenRead[key]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWrite[key]; ok {
sf.dupsCross++
continue
}
} else {
sf.trie.GetStorage(sf.addr, task)
if _, ok := sf.seenRead[key]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWrite[key]; ok {
sf.dupsWrite++
continue
}
}
if len(task.key) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task.key))
} else {
sf.trie.GetStorage(sf.addr, task.key)
}
if task.read {
sf.seenRead[key] = struct{}{}
} else {
sf.seenWrite[key] = struct{}{}
}
sf.seen[string(task)] = struct{}{}
}
case <-sf.stop:

@ -47,15 +47,15 @@ func filledStateDB() *StateDB {
func TestUseAfterTerminate(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", true)
skey := common.HexToHash("aaa")
if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err != nil {
if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}, false); err != nil {
t.Errorf("Prefetch failed before terminate: %v", err)
}
prefetcher.terminate(false)
if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err == nil {
if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}, false); err == nil {
t.Errorf("Prefetch succeeded after terminate: %v", err)
}
if tr := prefetcher.trie(common.Hash{}, db.originalRoot); tr == nil {

@ -33,6 +33,7 @@ type Config struct {
NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls)
EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages
ExtraEips []int // Additional EIPS that are to be enabled
EnableWitnessCollection bool // true if witness collection is enabled
}
// ScopeContext contains the things that are per-call, such as stack and memory,

@ -184,6 +184,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
var (
vmConfig = vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
EnableWitnessCollection: config.EnableWitnessCollection,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,

@ -141,6 +141,9 @@ type Config struct {
// Enables tracking of SHA3 preimages in the VM
EnablePreimageRecording bool
// Enables prefetching trie nodes for read operations too
EnableWitnessCollection bool `toml:"-"`
// Enables VM tracing
VMTrace string
VMTraceJsonConfig string