Trie prefetch on state pretch (#996)
* feature: do trie prefetch on state prefetch Currently, state prefetch just pre execute the transactions and discard the results. It is helpful to increase the snapshot cache hit rate. It would be more helpful, if it can do trie prefetch at the same time, since the it will preload the trie node and build the trie tree in advance. This patch is to implement it, by reusing the main trie prefetch and doing finalize after transaction is executed. * some code improvements for trie prefetch ** increase pendingSize before dispatch tasks ** use throwaway StateDB for TriePrefetchInAdvance and remove the prefetcherLock ** remove the necessary drain operation in trie prefetch mainloop, trie prefetcher won't be used after close.
This commit is contained in:
parent
df3e1be9d3
commit
77c8372cc4
@ -1892,13 +1892,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
|
|||||||
// do Prefetch in a separate goroutine to avoid blocking the critical path
|
// do Prefetch in a separate goroutine to avoid blocking the critical path
|
||||||
|
|
||||||
// 1.do state prefetch for snapshot cache
|
// 1.do state prefetch for snapshot cache
|
||||||
throwaway := statedb.Copy()
|
throwaway := statedb.CopyDoPrefetch()
|
||||||
go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh)
|
go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh)
|
||||||
|
|
||||||
// 2.do trie prefetch for MPT trie node cache
|
// 2.do trie prefetch for MPT trie node cache
|
||||||
// it is for the big state trie tree, prefetch based on transaction's From/To address.
|
// it is for the big state trie tree, prefetch based on transaction's From/To address.
|
||||||
// trie prefetcher is thread safe now, ok to prefetch in a separate routine
|
// trie prefetcher is thread safe now, ok to prefetch in a separate routine
|
||||||
go statedb.TriePrefetchInAdvance(block, signer)
|
go throwaway.TriePrefetchInAdvance(block, signer)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Process block using the parent state as reference point
|
//Process block using the parent state as reference point
|
||||||
|
@ -237,10 +237,9 @@ func (s *StateDB) StopPrefetcher() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) {
|
func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) {
|
||||||
s.prefetcherLock.Lock()
|
// s is a temporary throw away StateDB, s.prefetcher won't be resetted to nil
|
||||||
prefetcher := s.prefetcher // s.prefetcher could be resetted to nil
|
// so no need to add lock for s.prefetcher
|
||||||
s.prefetcherLock.Unlock()
|
prefetcher := s.prefetcher
|
||||||
|
|
||||||
if prefetcher == nil {
|
if prefetcher == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -805,6 +804,17 @@ func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common
|
|||||||
// Copy creates a deep, independent copy of the state.
|
// Copy creates a deep, independent copy of the state.
|
||||||
// Snapshots of the copied state cannot be applied to the copy.
|
// Snapshots of the copied state cannot be applied to the copy.
|
||||||
func (s *StateDB) Copy() *StateDB {
|
func (s *StateDB) Copy() *StateDB {
|
||||||
|
return s.copyInternal(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// It is mainly for state prefetcher to do trie prefetch right now.
|
||||||
|
func (s *StateDB) CopyDoPrefetch() *StateDB {
|
||||||
|
return s.copyInternal(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If doPrefetch is true, it tries to reuse the prefetcher, the copied StateDB will do active trie prefetch.
|
||||||
|
// otherwise, just do inactive copy trie prefetcher.
|
||||||
|
func (s *StateDB) copyInternal(doPrefetch bool) *StateDB {
|
||||||
// Copy all the basic fields, initialize the memory ones
|
// Copy all the basic fields, initialize the memory ones
|
||||||
state := &StateDB{
|
state := &StateDB{
|
||||||
db: s.db,
|
db: s.db,
|
||||||
@ -871,12 +881,12 @@ func (s *StateDB) Copy() *StateDB {
|
|||||||
state.accessList = s.accessList.Copy()
|
state.accessList = s.accessList.Copy()
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there's a prefetcher running, make an inactive copy of it that can
|
state.prefetcher = s.prefetcher
|
||||||
// only access data but does not actively preload (since the user will not
|
if s.prefetcher != nil && !doPrefetch {
|
||||||
// know that they need to explicitly terminate an active copy).
|
// If there's a prefetcher running, make an inactive copy of it that can
|
||||||
prefetcher := s.prefetcher
|
// only access data but does not actively preload (since the user will not
|
||||||
if prefetcher != nil {
|
// know that they need to explicitly terminate an active copy).
|
||||||
state.prefetcher = prefetcher.copy()
|
state.prefetcher = state.prefetcher.copy()
|
||||||
}
|
}
|
||||||
if s.snaps != nil {
|
if s.snaps != nil {
|
||||||
// In order for the miner to be able to use and make additions
|
// In order for the miner to be able to use and make additions
|
||||||
|
@ -172,15 +172,7 @@ func (p *triePrefetcher) mainLoop() {
|
|||||||
p.fetchersMutex.Lock()
|
p.fetchersMutex.Lock()
|
||||||
p.fetchers = nil
|
p.fetchers = nil
|
||||||
p.fetchersMutex.Unlock()
|
p.fetchersMutex.Unlock()
|
||||||
|
return
|
||||||
// drain all the channels before quit the loop
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-p.prefetchChan:
|
|
||||||
default:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -362,6 +354,7 @@ func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subf
|
|||||||
|
|
||||||
// schedule adds a batch of trie keys to the queue to prefetch.
|
// schedule adds a batch of trie keys to the queue to prefetch.
|
||||||
func (sf *subfetcher) schedule(keys [][]byte) {
|
func (sf *subfetcher) schedule(keys [][]byte) {
|
||||||
|
atomic.AddUint32(&sf.pendingSize, uint32(len(keys)))
|
||||||
// Append the tasks to the current queue
|
// Append the tasks to the current queue
|
||||||
sf.lock.Lock()
|
sf.lock.Lock()
|
||||||
sf.tasks = append(sf.tasks, keys...)
|
sf.tasks = append(sf.tasks, keys...)
|
||||||
@ -371,7 +364,6 @@ func (sf *subfetcher) schedule(keys [][]byte) {
|
|||||||
case sf.wake <- struct{}{}:
|
case sf.wake <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
atomic.AddUint32(&sf.pendingSize, uint32(len(keys)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sf *subfetcher) scheduleParallel(keys [][]byte) {
|
func (sf *subfetcher) scheduleParallel(keys [][]byte) {
|
||||||
|
@ -58,7 +58,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
|
|||||||
// No need to execute the first batch, since the main processor will do it.
|
// No need to execute the first batch, since the main processor will do it.
|
||||||
for i := 0; i < prefetchThread; i++ {
|
for i := 0; i < prefetchThread; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
newStatedb := statedb.Copy()
|
newStatedb := statedb.CopyDoPrefetch()
|
||||||
newStatedb.EnableWriteOnSharedStorage()
|
newStatedb.EnableWriteOnSharedStorage()
|
||||||
gaspool := new(GasPool).AddGas(block.GasLimit())
|
gaspool := new(GasPool).AddGas(block.GasLimit())
|
||||||
blockContext := NewEVMBlockContext(header, p.bc, nil)
|
blockContext := NewEVMBlockContext(header, p.bc, nil)
|
||||||
@ -100,7 +100,7 @@ func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce,
|
|||||||
for i := 0; i < prefetchThread; i++ {
|
for i := 0; i < prefetchThread; i++ {
|
||||||
go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) {
|
go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) {
|
||||||
idx := 0
|
idx := 0
|
||||||
newStatedb := statedb.Copy()
|
newStatedb := statedb.CopyDoPrefetch()
|
||||||
newStatedb.EnableWriteOnSharedStorage()
|
newStatedb.EnableWriteOnSharedStorage()
|
||||||
gaspool := new(GasPool).AddGas(gasLimit)
|
gaspool := new(GasPool).AddGas(gasLimit)
|
||||||
blockContext := NewEVMBlockContext(header, p.bc, nil)
|
blockContext := NewEVMBlockContext(header, p.bc, nil)
|
||||||
@ -153,5 +153,8 @@ func precacheTransaction(msg types.Message, config *params.ChainConfig, gaspool
|
|||||||
// Update the evm with the new transaction context.
|
// Update the evm with the new transaction context.
|
||||||
evm.Reset(NewEVMTxContext(msg), statedb)
|
evm.Reset(NewEVMTxContext(msg), statedb)
|
||||||
// Add addresses to access list if applicable
|
// Add addresses to access list if applicable
|
||||||
ApplyMessage(evm, msg, gaspool)
|
if _, err := ApplyMessage(evm, msg, gaspool); err == nil {
|
||||||
|
statedb.Finalise(true)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -900,7 +900,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
|
|||||||
txsPrefetch := txs.Copy()
|
txsPrefetch := txs.Copy()
|
||||||
tx := txsPrefetch.Peek()
|
tx := txsPrefetch.Peek()
|
||||||
txCurr := &tx
|
txCurr := &tx
|
||||||
w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr)
|
w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), interruptCh, txCurr)
|
||||||
|
|
||||||
LOOP:
|
LOOP:
|
||||||
for {
|
for {
|
||||||
|
Loading…
Reference in New Issue
Block a user