From 2ac83e197bc6417231d6faa2027486cf3e00b6d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 13 May 2024 15:47:45 +0300 Subject: [PATCH] core/state: blocking prefetcher on term signal, parallel updates (#29519) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * core/state: trie prefetcher change: calling trie() doesn't stop the associated subfetcher Co-authored-by: Martin HS Co-authored-by: Péter Szilágyi * core/state: improve prefetcher * core/state: restore async prefetcher stask scheduling * core/state: finish prefetching async and process storage updates async * core/state: don't use the prefetcher for missing snapshot items * core/state: remove update concurrency for Verkle tries * core/state: add some termination checks to prefetcher async shutdowns * core/state: differentiate db tries and prefetched tries * core/state: teh teh teh --------- Co-authored-by: Jared Wasinger Co-authored-by: Martin HS Co-authored-by: Gary Rong --- core/blockchain.go | 8 +- core/state/state_object.go | 97 ++++++--- core/state/statedb.go | 112 ++++++----- core/state/trie_prefetcher.go | 302 +++++++++++++---------------- core/state/trie_prefetcher_test.go | 65 +------ 5 files changed, 279 insertions(+), 305 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 56e00e85b6..7c8ab3abc4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1805,8 +1805,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) } statedb.SetLogger(bc.logger) - // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain") + // If we are past Byzantium, enable prefetching to pull in trie node paths + // while processing transactions. Before Byzantium the prefetcher is mostly + // useless due to the intermediate root hashing after each transaction. + if bc.chainConfig.IsByzantium(block.Number()) { + statedb.StartPrefetcher("chain") + } activeState = statedb // If we have a followup block, run that against the current state to pre-cache diff --git a/core/state/state_object.go b/core/state/state_object.go index cc42d06879..3239be368c 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "maps" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -33,6 +34,14 @@ import ( "github.com/holiman/uint256" ) +// hasherPool holds a pool of hashers used by state objects during concurrent +// trie updates. +var hasherPool = sync.Pool{ + New: func() interface{} { + return crypto.NewKeccakState() + }, +} + type Storage map[common.Hash]common.Hash func (s Storage) Copy() Storage { @@ -118,27 +127,39 @@ func (s *stateObject) touch() { } } -// getTrie returns the associated storage trie. The trie will be opened -// if it's not loaded previously. An error will be returned if trie can't -// be loaded. +// getTrie returns the associated storage trie. The trie will be opened if it' +// not loaded previously. An error will be returned if trie can't be loaded. +// +// If a new trie is opened, it will be cached within the state object to allow +// subsequent reads to expand the same trie instead of reloading from disk. func (s *stateObject) getTrie() (Trie, error) { if s.trie == nil { - // Try fetching from prefetcher first - if s.data.Root != types.EmptyRootHash && s.db.prefetcher != nil { - // When the miner is creating the pending state, there is no prefetcher - s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root) - } - if s.trie == nil { - tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root, s.db.trie) - if err != nil { - return nil, err - } - s.trie = tr + tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root, s.db.trie) + if err != nil { + return nil, err } + s.trie = tr } return s.trie, nil } +// getPrefetchedTrie returns the associated trie, as populated by the prefetcher +// if it's available. +// +// Note, opposed to getTrie, this method will *NOT* blindly cache the resulting +// trie in the state object. The caller might want to do that, but it's cleaner +// to break the hidden interdependency between retrieving tries from the db or +// from the prefetcher. +func (s *stateObject) getPrefetchedTrie() (Trie, error) { + // If there's nothing to meaningfully return, let the user figure it out by + // pulling the trie from disk. + if s.data.Root == types.EmptyRootHash || s.db.prefetcher == nil { + return nil, nil + } + // Attempt to retrieve the trie from the pretecher + return s.db.prefetcher.trie(s.addrHash, s.data.Root) +} + // GetState retrieves a value from the account storage trie. func (s *stateObject) GetState(key common.Hash) common.Hash { value, _ := s.getState(key) @@ -248,7 +269,7 @@ func (s *stateObject) setState(key common.Hash, value common.Hash, origin common // finalise moves all dirty storage slots into the pending area to be hashed or // committed later. It is invoked at the end of every transaction. -func (s *stateObject) finalise(prefetch bool) { +func (s *stateObject) finalise() { slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage)) for key, value := range s.dirtyStorage { // If the slot is different from its original value, move it into the @@ -263,8 +284,10 @@ func (s *stateObject) finalise(prefetch bool) { delete(s.pendingStorage, key) } } - if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash { - s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch) + if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash { + if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch); err != nil { + log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err) + } } if len(s.dirtyStorage) > 0 { s.dirtyStorage = make(Storage) @@ -283,25 +306,43 @@ func (s *stateObject) finalise(prefetch bool) { // storage change at all. func (s *stateObject) updateTrie() (Trie, error) { // Make sure all dirty slots are finalized into the pending storage area - s.finalise(false) + s.finalise() // Short circuit if nothing changed, don't bother with hashing anything if len(s.pendingStorage) == 0 { return s.trie, nil } + // Retrieve a pretecher populated trie, or fall back to the database + tr, err := s.getPrefetchedTrie() + switch { + case err != nil: + // Fetcher retrieval failed, something's very wrong, abort + s.db.setError(err) + return nil, err + + case tr == nil: + // Fetcher not running or empty trie, fallback to the database trie + tr, err = s.getTrie() + if err != nil { + s.db.setError(err) + return nil, err + } + + default: + // Prefetcher returned a live trie, swap it out for the current one + s.trie = tr + } // The snapshot storage map for the object var ( storage map[common.Hash][]byte origin map[common.Hash][]byte ) - tr, err := s.getTrie() - if err != nil { - s.db.setError(err) - return nil, err - } // Insert all the pending storage updates into the trie usedStorage := make([][]byte, 0, len(s.pendingStorage)) + hasher := hasherPool.Get().(crypto.KeccakState) + defer hasherPool.Put(hasher) + // Perform trie updates before deletions. This prevents resolution of unnecessary trie nodes // in circumstances similar to the following: // @@ -330,26 +371,30 @@ func (s *stateObject) updateTrie() (Trie, error) { s.db.setError(err) return nil, err } - s.db.StorageUpdated += 1 + s.db.StorageUpdated.Add(1) } else { deletions = append(deletions, key) } // Cache the mutated storage slots until commit if storage == nil { + s.db.storagesLock.Lock() if storage = s.db.storages[s.addrHash]; storage == nil { storage = make(map[common.Hash][]byte) s.db.storages[s.addrHash] = storage } + s.db.storagesLock.Unlock() } - khash := crypto.HashData(s.db.hasher, key[:]) + khash := crypto.HashData(hasher, key[:]) storage[khash] = encoded // encoded will be nil if it's deleted // Cache the original value of mutated storage slots if origin == nil { + s.db.storagesLock.Lock() if origin = s.db.storagesOrigin[s.address]; origin == nil { origin = make(map[common.Hash][]byte) s.db.storagesOrigin[s.address] = origin } + s.db.storagesLock.Unlock() } // Track the original value of slot only if it's mutated first time if _, ok := origin[khash]; !ok { @@ -369,7 +414,7 @@ func (s *stateObject) updateTrie() (Trie, error) { s.db.setError(err) return nil, err } - s.db.StorageDeleted += 1 + s.db.StorageDeleted.Add(1) } // If no slots were touched, issue a warning as we shouldn't have done all // the above work in the first place diff --git a/core/state/statedb.go b/core/state/statedb.go index 1b028a2844..0ef52a88f6 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -24,6 +24,7 @@ import ( "slices" "sort" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -96,10 +97,12 @@ type StateDB struct { // These maps hold the state changes (including the corresponding // original value) that occurred in this **block**. - accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding + accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding + accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding + storages map[common.Hash]map[common.Hash][]byte // The mutated slots in prefix-zero trimmed rlp format - accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding storagesOrigin map[common.Address]map[common.Hash][]byte // The original value of mutated slots in prefix-zero trimmed rlp format + storagesLock sync.Mutex // Mutex protecting the maps during concurrent updates/commits // This map holds 'live' objects, which will get modified while // processing a state transition. @@ -165,9 +168,9 @@ type StateDB struct { TrieDBCommits time.Duration AccountUpdated int - StorageUpdated int + StorageUpdated atomic.Int64 AccountDeleted int - StorageDeleted int + StorageDeleted atomic.Int64 // Testing hooks onCommit func(states *triestate.Set) // Hook invoked when commit is performed @@ -214,7 +217,8 @@ func (s *StateDB) SetLogger(l *tracing.Hooks) { // commit phase, most of the needed data is already hot. func (s *StateDB) StartPrefetcher(namespace string) { if s.prefetcher != nil { - s.prefetcher.close() + s.prefetcher.terminate(false) + s.prefetcher.report() s.prefetcher = nil } if s.snap != nil { @@ -226,7 +230,8 @@ func (s *StateDB) StartPrefetcher(namespace string) { // from the gathered metrics. func (s *StateDB) StopPrefetcher() { if s.prefetcher != nil { - s.prefetcher.close() + s.prefetcher.terminate(false) + s.prefetcher.report() s.prefetcher = nil } } @@ -544,9 +549,6 @@ func (s *StateDB) GetTransientState(addr common.Address, key common.Hash) common // updateStateObject writes the given object to the trie. func (s *StateDB) updateStateObject(obj *stateObject) { - // Track the amount of time wasted on updating the account from the trie - defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now()) - // Encode the account and update the account trie addr := obj.Address() if err := s.trie.UpdateAccount(addr, &obj.data); err != nil { @@ -575,10 +577,6 @@ func (s *StateDB) updateStateObject(obj *stateObject) { // deleteStateObject removes the given object from the state trie. func (s *StateDB) deleteStateObject(addr common.Address) { - // Track the amount of time wasted on deleting the account from the trie - defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now()) - - // Delete the account from the trie if err := s.trie.DeleteAccount(addr); err != nil { s.setError(fmt.Errorf("deleteStateObject (%x) error: %v", addr[:], err)) } @@ -743,13 +741,6 @@ func (s *StateDB) Copy() *StateDB { // in the middle of a transaction. state.accessList = s.accessList.Copy() state.transientStorage = s.transientStorage.Copy() - - // If there's a prefetcher running, make an inactive copy of it that can - // only access data but does not actively preload (since the user will not - // know that they need to explicitly terminate an active copy). - if s.prefetcher != nil { - state.prefetcher = s.prefetcher.copy() - } return state } @@ -820,7 +811,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { delete(s.accountsOrigin, obj.address) // Clear out any previously updated account data (may be recreated via a resurrect) delete(s.storagesOrigin, obj.address) // Clear out any previously updated storage data (may be recreated via a resurrect) } else { - obj.finalise(true) // Prefetch slots in the background + obj.finalise() s.markUpdate(addr) } // At this point, also ship the address off to the precacher. The precacher @@ -829,7 +820,9 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure } if s.prefetcher != nil && len(addressesToPrefetch) > 0 { - s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch) + if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch); err != nil { + log.Error("Failed to prefetch addresses", "addresses", len(addressesToPrefetch), "err", err) + } } // Invalidate journal because reverting across transactions is not allowed. s.clearJournalAndRefund() @@ -842,42 +835,52 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // Finalise all the dirty storage states and write them into the tries s.Finalise(deleteEmptyObjects) - // If there was a trie prefetcher operating, it gets aborted and irrevocably - // modified after we start retrieving tries. Remove it from the statedb after - // this round of use. - // - // This is weird pre-byzantium since the first tx runs with a prefetcher and - // the remainder without, but pre-byzantium even the initial prefetcher is - // useless, so no sleep lost. - prefetcher := s.prefetcher + // If there was a trie prefetcher operating, terminate it async so that the + // individual storage tries can be updated as soon as the disk load finishes. if s.prefetcher != nil { + s.prefetcher.terminate(true) defer func() { - s.prefetcher.close() - s.prefetcher = nil + s.prefetcher.report() + s.prefetcher = nil // Pre-byzantium, unset any used up prefetcher }() } - // Although naively it makes sense to retrieve the account trie and then do - // the contract storage and account updates sequentially, that short circuits - // the account prefetcher. Instead, let's process all the storage updates - // first, giving the account prefetches just a few more milliseconds of time - // to pull useful data from disk. - start := time.Now() - for addr, op := range s.mutations { - if op.applied { - continue - } - if op.isDelete() { - continue - } - s.stateObjects[addr].updateRoot() + // Process all storage updates concurrently. The state object update root + // method will internally call a blocking trie fetch from the prefetcher, + // so there's no need to explicitly wait for the prefetchers to finish. + var ( + start = time.Now() + workers errgroup.Group + ) + if s.db.TrieDB().IsVerkle() { + // Whilst MPT storage tries are independent, Verkle has one single trie + // for all the accounts and all the storage slots merged together. The + // former can thus be simply parallelized, but updating the latter will + // need concurrency support within the trie itself. That's a TODO for a + // later time. + workers.SetLimit(1) } + for addr, op := range s.mutations { + if op.applied || op.isDelete() { + continue + } + obj := s.stateObjects[addr] // closure for the task runner below + workers.Go(func() error { + obj.updateRoot() + return nil + }) + } + workers.Wait() s.StorageUpdates += time.Since(start) // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it. - if prefetcher != nil { - if trie := prefetcher.trie(common.Hash{}, s.originalRoot); trie != nil { + start = time.Now() + + if s.prefetcher != nil { + if trie, err := s.prefetcher.trie(common.Hash{}, s.originalRoot); err != nil { + log.Error("Failed to retrieve account pre-fetcher trie", "err", err) + } else if trie != nil { s.trie = trie } } @@ -913,8 +916,10 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { s.deleteStateObject(deletedAddr) s.AccountDeleted += 1 } - if prefetcher != nil { - prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs) + s.AccountUpdates += time.Since(start) + + if s.prefetcher != nil { + s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs) } // Track the amount of time wasted on hashing the account trie defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now()) @@ -1255,15 +1260,16 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er return common.Hash{}, err } accountUpdatedMeter.Mark(int64(s.AccountUpdated)) - storageUpdatedMeter.Mark(int64(s.StorageUpdated)) + storageUpdatedMeter.Mark(s.StorageUpdated.Load()) accountDeletedMeter.Mark(int64(s.AccountDeleted)) - storageDeletedMeter.Mark(int64(s.StorageDeleted)) + storageDeletedMeter.Mark(s.StorageDeleted.Load()) accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated)) accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted)) storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated)) storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted)) s.AccountUpdated, s.AccountDeleted = 0, 0 - s.StorageUpdated, s.StorageDeleted = 0, 0 + s.StorageUpdated.Store(0) + s.StorageDeleted.Store(0) // If snapshotting is enabled, update the snapshot tree with this new version if s.snap != nil { diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index c2a49417d4..7e08964e41 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -17,6 +17,7 @@ package state import ( + "errors" "sync" "github.com/ethereum/go-ethereum/common" @@ -27,6 +28,10 @@ import ( var ( // triePrefetchMetricsPrefix is the prefix under which to publish the metrics. triePrefetchMetricsPrefix = "trie/prefetch/" + + // errTerminated is returned if a fetcher is attempted to be operated after it + // has already terminated. + errTerminated = errors.New("fetcher is already terminated") ) // triePrefetcher is an active prefetcher, which receives accounts or storage @@ -37,160 +42,126 @@ var ( type triePrefetcher struct { db Database // Database to fetch trie nodes through root common.Hash // Root hash of the account trie for metrics - fetches map[string]Trie // Partially or fully fetched tries. Only populated for inactive copies. fetchers map[string]*subfetcher // Subfetchers for each trie + term chan struct{} // Channel to signal interruption deliveryMissMeter metrics.Meter accountLoadMeter metrics.Meter accountDupMeter metrics.Meter - accountSkipMeter metrics.Meter accountWasteMeter metrics.Meter storageLoadMeter metrics.Meter storageDupMeter metrics.Meter - storageSkipMeter metrics.Meter storageWasteMeter metrics.Meter } func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace - p := &triePrefetcher{ + return &triePrefetcher{ db: db, root: root, fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map + term: make(chan struct{}), deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), - accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil), accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), - storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), } - return p } -// close iterates over all the subfetchers, aborts any that were left spinning -// and reports the stats to the metrics subsystem. -func (p *triePrefetcher) close() { +// terminate iterates over all the subfetchers and issues a terminateion request +// to all of them. Depending on the async parameter, the method will either block +// until all subfetchers spin down, or return immediately. +func (p *triePrefetcher) terminate(async bool) { + // Short circuit if the fetcher is already closed + select { + case <-p.term: + return + default: + } + // Termiante all sub-fetchers, sync or async, depending on the request for _, fetcher := range p.fetchers { - fetcher.abort() // safe to do multiple times - - if metrics.Enabled { - if fetcher.root == p.root { - p.accountLoadMeter.Mark(int64(len(fetcher.seen))) - p.accountDupMeter.Mark(int64(fetcher.dups)) - p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) - - for _, key := range fetcher.used { - delete(fetcher.seen, string(key)) - } - p.accountWasteMeter.Mark(int64(len(fetcher.seen))) - } else { - p.storageLoadMeter.Mark(int64(len(fetcher.seen))) - p.storageDupMeter.Mark(int64(fetcher.dups)) - p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) - - for _, key := range fetcher.used { - delete(fetcher.seen, string(key)) - } - p.storageWasteMeter.Mark(int64(len(fetcher.seen))) - } - } + fetcher.terminate(async) } - // Clear out all fetchers (will crash on a second call, deliberate) - p.fetchers = nil + close(p.term) } -// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data -// already loaded will be copied over, but no goroutines will be started. This -// is mostly used in the miner which creates a copy of it's actively mutated -// state to be sealed while it may further mutate the state. -func (p *triePrefetcher) copy() *triePrefetcher { - copy := &triePrefetcher{ - db: p.db, - root: p.root, - fetches: make(map[string]Trie), // Active prefetchers use the fetches map - - deliveryMissMeter: p.deliveryMissMeter, - accountLoadMeter: p.accountLoadMeter, - accountDupMeter: p.accountDupMeter, - accountSkipMeter: p.accountSkipMeter, - accountWasteMeter: p.accountWasteMeter, - storageLoadMeter: p.storageLoadMeter, - storageDupMeter: p.storageDupMeter, - storageSkipMeter: p.storageSkipMeter, - storageWasteMeter: p.storageWasteMeter, - } - // If the prefetcher is already a copy, duplicate the data - if p.fetches != nil { - for root, fetch := range p.fetches { - if fetch == nil { - continue - } - copy.fetches[root] = p.db.CopyTrie(fetch) - } - return copy - } - // Otherwise we're copying an active fetcher, retrieve the current states - for id, fetcher := range p.fetchers { - copy.fetches[id] = fetcher.peek() - } - return copy -} - -// prefetch schedules a batch of trie items to prefetch. -func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) { - // If the prefetcher is an inactive one, bail out - if p.fetches != nil { +// report aggregates the pre-fetching and usage metrics and reports them. +func (p *triePrefetcher) report() { + if !metrics.Enabled { return } - // Active fetcher, schedule the retrievals + for _, fetcher := range p.fetchers { + fetcher.wait() // ensure the fetcher's idle before poking in its internals + + if fetcher.root == p.root { + p.accountLoadMeter.Mark(int64(len(fetcher.seen))) + p.accountDupMeter.Mark(int64(fetcher.dups)) + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + p.accountWasteMeter.Mark(int64(len(fetcher.seen))) + } else { + p.storageLoadMeter.Mark(int64(len(fetcher.seen))) + p.storageDupMeter.Mark(int64(fetcher.dups)) + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + p.storageWasteMeter.Mark(int64(len(fetcher.seen))) + } + } +} + +// prefetch schedules a batch of trie items to prefetch. After the prefetcher is +// closed, all the following tasks scheduled will not be executed and an error +// will be returned. +// +// prefetch is called from two locations: +// +// 1. Finalize of the state-objects storage roots. This happens at the end +// of every transaction, meaning that if several transactions touches +// upon the same contract, the parameters invoking this method may be +// repeated. +// 2. Finalize of the main account trie. This happens only once per block. +func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) error { + // Ensure the subfetcher is still alive + select { + case <-p.term: + return errTerminated + default: + } id := p.trieID(owner, root) fetcher := p.fetchers[id] if fetcher == nil { fetcher = newSubfetcher(p.db, p.root, owner, root, addr) p.fetchers[id] = fetcher } - fetcher.schedule(keys) + return fetcher.schedule(keys) } -// trie returns the trie matching the root hash, or nil if the prefetcher doesn't -// have it. -func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie { - // If the prefetcher is inactive, return from existing deep copies - id := p.trieID(owner, root) - if p.fetches != nil { - trie := p.fetches[id] - if trie == nil { - p.deliveryMissMeter.Mark(1) - return nil - } - return p.db.CopyTrie(trie) - } - // Otherwise the prefetcher is active, bail if no trie was prefetched for this root - fetcher := p.fetchers[id] +// trie returns the trie matching the root hash, blocking until the fetcher of +// the given trie terminates. If no fetcher exists for the request, nil will be +// returned. +func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) (Trie, error) { + // Bail if no trie was prefetched for this root + fetcher := p.fetchers[p.trieID(owner, root)] if fetcher == nil { + log.Error("Prefetcher missed to load trie", "owner", owner, "root", root) p.deliveryMissMeter.Mark(1) - return nil + return nil, nil } - // Interrupt the prefetcher if it's by any chance still running and return - // a copy of any pre-loaded trie. - fetcher.abort() // safe to do multiple times - - trie := fetcher.peek() - if trie == nil { - p.deliveryMissMeter.Mark(1) - return nil - } - return trie + // Subfetcher exists, retrieve its trie + return fetcher.peek(), nil } // used marks a batch of state items used to allow creating statistics as to -// how useful or wasteful the prefetcher is. +// how useful or wasteful the fetcher is. func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) { if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil { + fetcher.wait() // ensure the fetcher's idle before poking in its internals fetcher.used = used } } @@ -218,10 +189,9 @@ type subfetcher struct { tasks [][]byte // Items queued up for retrieval lock sync.Mutex // Lock protecting the task queue - wake chan struct{} // Wake channel if a new task is scheduled - stop chan struct{} // Channel to interrupt processing - term chan struct{} // Channel to signal interruption - copy chan chan Trie // Channel to request a copy of the current trie + wake chan struct{} // Wake channel if a new task is scheduled + stop chan struct{} // Channel to interrupt processing + term chan struct{} // Channel to signal interruption seen map[string]struct{} // Tracks the entries already loaded dups int // Number of duplicate preload tasks @@ -240,7 +210,6 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo wake: make(chan struct{}, 1), stop: make(chan struct{}), term: make(chan struct{}), - copy: make(chan chan Trie), seen: make(map[string]struct{}), } go sf.loop() @@ -248,50 +217,61 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo } // schedule adds a batch of trie keys to the queue to prefetch. -func (sf *subfetcher) schedule(keys [][]byte) { +func (sf *subfetcher) schedule(keys [][]byte) error { + // Ensure the subfetcher is still alive + select { + case <-sf.term: + return errTerminated + default: + } // Append the tasks to the current queue sf.lock.Lock() sf.tasks = append(sf.tasks, keys...) sf.lock.Unlock() - // Notify the prefetcher, it's fine if it's already terminated + // Notify the background thread to execute scheduled tasks select { case sf.wake <- struct{}{}: + // Wake signal sent default: + // Wake signal not sent as a previous is already queued } + return nil } -// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it -// is currently. +// wait blocks until the subfetcher terminates. This method is used to block on +// an async termination before accessing internal fields from the fetcher. +func (sf *subfetcher) wait() { + <-sf.term +} + +// peek retrieves the fetcher's trie, populated with any pre-fetched data. The +// returned trie will be a shallow copy, so modifying it will break subsequent +// peeks for the original data. The method will block until all the scheduled +// data has been loaded and the fethcer terminated. func (sf *subfetcher) peek() Trie { - ch := make(chan Trie) - select { - case sf.copy <- ch: - // Subfetcher still alive, return copy from it - return <-ch - - case <-sf.term: - // Subfetcher already terminated, return a copy directly - if sf.trie == nil { - return nil - } - return sf.db.CopyTrie(sf.trie) - } + // Block until the fertcher terminates, then retrieve the trie + sf.wait() + return sf.trie } -// abort interrupts the subfetcher immediately. It is safe to call abort multiple -// times but it is not thread safe. -func (sf *subfetcher) abort() { +// terminate requests the subfetcher to stop accepting new tasks and spin down +// as soon as everything is loaded. Depending on the async parameter, the method +// will either block until all disk loads finish or return immediately. +func (sf *subfetcher) terminate(async bool) { select { case <-sf.stop: default: close(sf.stop) } + if async { + return + } <-sf.term } -// loop waits for new tasks to be scheduled and keeps loading them until it runs -// out of tasks or its underlying trie is retrieved for committing. +// loop loads newly-scheduled trie tasks as they are received and loads them, stopping +// when requested. func (sf *subfetcher) loop() { // No matter how the loop stops, signal anyone waiting that it's terminated defer close(sf.term) @@ -305,8 +285,6 @@ func (sf *subfetcher) loop() { } sf.trie = trie } else { - // The trie argument can be nil as verkle doesn't support prefetching - // yet. TODO FIX IT(rjl493456442), otherwise code will panic here. trie, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil) if err != nil { log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) @@ -318,48 +296,38 @@ func (sf *subfetcher) loop() { for { select { case <-sf.wake: - // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock + // Execute all remaining tasks in single run sf.lock.Lock() tasks := sf.tasks sf.tasks = nil sf.lock.Unlock() - // Prefetch any tasks until the loop is interrupted - for i, task := range tasks { - select { - case <-sf.stop: - // If termination is requested, add any leftover back and return - sf.lock.Lock() - sf.tasks = append(sf.tasks, tasks[i:]...) - sf.lock.Unlock() - return - - case ch := <-sf.copy: - // Somebody wants a copy of the current trie, grant them - ch <- sf.db.CopyTrie(sf.trie) - - default: - // No termination request yet, prefetch the next entry - if _, ok := sf.seen[string(task)]; ok { - sf.dups++ - } else { - if len(task) == common.AddressLength { - sf.trie.GetAccount(common.BytesToAddress(task)) - } else { - sf.trie.GetStorage(sf.addr, task) - } - sf.seen[string(task)] = struct{}{} - } + for _, task := range tasks { + if _, ok := sf.seen[string(task)]; ok { + sf.dups++ + continue } + if len(task) == common.AddressLength { + sf.trie.GetAccount(common.BytesToAddress(task)) + } else { + sf.trie.GetStorage(sf.addr, task) + } + sf.seen[string(task)] = struct{}{} } - case ch := <-sf.copy: - // Somebody wants a copy of the current trie, grant them - ch <- sf.db.CopyTrie(sf.trie) - case <-sf.stop: - // Termination is requested, abort and leave remaining tasks - return + // Termination is requested, abort if no more tasks are pending. If + // there are some, exhaust them first. + sf.lock.Lock() + done := sf.tasks == nil + sf.lock.Unlock() + + if done { + return + } + // Some tasks are pending, loop and pick them up (that wake branch + // will be selected eventually, whilst stop remains closed to this + // branch will also run afterwards). } } } diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index a616adf98f..d6788fad99 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -19,7 +19,6 @@ package state import ( "math/big" "testing" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" @@ -46,68 +45,20 @@ func filledStateDB() *StateDB { return state } -func TestCopyAndClose(t *testing.T) { +func TestUseAfterTerminate(t *testing.T) { db := filledStateDB() prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") skey := common.HexToHash("aaa") - prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) - prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) - time.Sleep(1 * time.Second) - a := prefetcher.trie(common.Hash{}, db.originalRoot) - prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) - b := prefetcher.trie(common.Hash{}, db.originalRoot) - cpy := prefetcher.copy() - cpy.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) - cpy.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) - c := cpy.trie(common.Hash{}, db.originalRoot) - prefetcher.close() - cpy2 := cpy.copy() - cpy2.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) - d := cpy2.trie(common.Hash{}, db.originalRoot) - cpy.close() - cpy2.close() - if a.Hash() != b.Hash() || a.Hash() != c.Hash() || a.Hash() != d.Hash() { - t.Fatalf("Invalid trie, hashes should be equal: %v %v %v %v", a.Hash(), b.Hash(), c.Hash(), d.Hash()) - } -} -func TestUseAfterClose(t *testing.T) { - db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") - skey := common.HexToHash("aaa") - prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) - a := prefetcher.trie(common.Hash{}, db.originalRoot) - prefetcher.close() - b := prefetcher.trie(common.Hash{}, db.originalRoot) - if a == nil { - t.Fatal("Prefetching before close should not return nil") + if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err != nil { + t.Errorf("Prefetch failed before terminate: %v", err) } - if b != nil { - t.Fatal("Trie after close should return nil") - } -} + prefetcher.terminate(false) -func TestCopyClose(t *testing.T) { - db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") - skey := common.HexToHash("aaa") - prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) - cpy := prefetcher.copy() - a := prefetcher.trie(common.Hash{}, db.originalRoot) - b := cpy.trie(common.Hash{}, db.originalRoot) - prefetcher.close() - c := prefetcher.trie(common.Hash{}, db.originalRoot) - d := cpy.trie(common.Hash{}, db.originalRoot) - if a == nil { - t.Fatal("Prefetching before close should not return nil") + if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err == nil { + t.Errorf("Prefetch succeeded after terminate: %v", err) } - if b == nil { - t.Fatal("Copy trie should return nil") - } - if c != nil { - t.Fatal("Trie after close should return nil") - } - if d == nil { - t.Fatal("Copy trie should not return nil") + if _, err := prefetcher.trie(common.Hash{}, db.originalRoot); err != nil { + t.Errorf("Trie retrieval failed after terminate: %v", err) } }