[R4R]: Redesign triePrefetcher to make it thread safe (#972)

* Redesign triePrefetcher to make it thread safe

There are 2 types of triePrefetcher instances:
1.New created triePrefetcher: it is key to do trie prefetch to speed up validation phase.
2.Copied triePrefetcher: it only copy the prefetched trie information, actually it won't do
  prefetch at all, the copied tries are all kept in p.fetches.

Here we try to improve the new created one, to make it concurrent safe, while the copied one's
behavior stay unchanged(its logic is very simple).
As commented in triePrefetcher struct, its APIs are not thread safe. So callers should make sure
the created triePrefetcher should be used within a single routine.
As we are trying to improve triePrefetcher, we would use it concurrently, so it is necessary to
redesign it for concurrent access.

The design is simple:
** start a mainLoop to do all the work, APIs just send channel message.

Others:
** remove the metrics copy, since it is useless for copied triePrefetcher
** for trie(), only get subfetcher through channel to reduce the workload of mainloop

* some code enhancement for triePrefetcher redesign

* some fixup: rename, temporary trie chan for concurrent safe.

* fix review comments

* add some protection in case the trie prefetcher is already stopped

* fix review comments

** make close concurrent safe
** fix potential deadlock

* replace channel by RWMutex for a few triePrefetcher APIs

For APIs like: trie(), copy(), used(), it is simpler and more efficient to
use a RWMutex instead of channel communicaton.
Since the mainLoop would be busy handling trie request, while these trie request
can be processed in parallism.

We would only keep prefetch and close within the mainLoop, since they could update
the fetchers

* add lock for subfecter.used access to make it concurrent safe

* no need to create channel for copied triePrefetcher

* fix trie_prefetcher_test.go

trie prefetcher’s behavior has changed, prefetch() won't create subfetcher immediately.
it is reasonable, but break the UT, to fix the failed UT
This commit is contained in:
setunapo 2022-07-07 10:00:09 +08:00 committed by GitHub
parent 552e404e8c
commit 8e74562179
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 175 additions and 85 deletions

@ -18,19 +18,29 @@ package state
import (
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
const abortChanSize = 64
const (
abortChanSize = 64
concurrentChanSize = 10
)
var (
// triePrefetchMetricsPrefix is the prefix under which to publis the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/"
)
type prefetchMsg struct {
root common.Hash
accountHash common.Hash
keys [][]byte
}
// triePrefetcher is an active prefetcher, which receives accounts or storage
// items and does trie-loading of them. The goal is to get as much useful content
// into the caches as possible.
@ -42,8 +52,14 @@ type triePrefetcher struct {
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie
abortChan chan *subfetcher
closeChan chan struct{}
closed int32
closeMainChan chan struct{} // it is to inform the mainLoop
closeMainDoneChan chan struct{}
fetchersMutex sync.RWMutex
prefetchChan chan *prefetchMsg // no need to wait for return
abortChan chan *subfetcher
closeAbortChan chan struct{} // it is used to inform abortLoop
deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
@ -60,11 +76,15 @@ type triePrefetcher struct {
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
closeChan: make(chan struct{}),
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
closeAbortChan: make(chan struct{}),
closeMainChan: make(chan struct{}),
closeMainDoneChan: make(chan struct{}),
prefetchChan: make(chan *prefetchMsg, concurrentChanSize),
deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
@ -77,20 +97,62 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
go p.abortLoop()
go p.mainLoop()
return p
}
func (p *triePrefetcher) abortLoop() {
func (p *triePrefetcher) mainLoop() {
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.closeChan:
// drain fetcher channel
case pMsg := <-p.prefetchChan:
fetcher := p.fetchers[pMsg.root]
if fetcher == nil {
fetcher = newSubfetcher(p.db, pMsg.root, pMsg.accountHash)
p.fetchersMutex.Lock()
p.fetchers[pMsg.root] = fetcher
p.fetchersMutex.Unlock()
}
fetcher.schedule(pMsg.keys)
case <-p.closeMainChan:
for _, fetcher := range p.fetchers {
p.abortChan <- fetcher // safe to do multiple times
<-fetcher.term
if metrics.EnabledExpensive {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
fetcher.lock.Lock()
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
fetcher.lock.Unlock()
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
fetcher.lock.Lock()
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
fetcher.lock.Unlock()
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
}
close(p.closeAbortChan)
close(p.closeMainDoneChan)
p.fetchersMutex.Lock()
p.fetchers = nil
p.fetchersMutex.Unlock()
// drain all the channels before quit the loop
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.prefetchChan:
default:
return
}
@ -99,37 +161,28 @@ func (p *triePrefetcher) abortLoop() {
}
}
func (p *triePrefetcher) abortLoop() {
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.closeAbortChan:
return
}
}
}
// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
for _, fetcher := range p.fetchers {
p.abortChan <- fetcher // safe to do multiple times
<-fetcher.term
if metrics.EnabledExpensive {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
}
if atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
close(p.closeMainChan)
<-p.closeMainDoneChan // wait until all subfetcher are stopped
}
close(p.closeChan)
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}
// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
@ -137,35 +190,45 @@ func (p *triePrefetcher) close() {
// is mostly used in the miner which creates a copy of it's actively mutated
// state to be sealed while it may further mutate the state.
func (p *triePrefetcher) copy() *triePrefetcher {
copy := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map
abortChan: make(chan *subfetcher),
closeChan: make(chan struct{}),
deliveryMissMeter: p.deliveryMissMeter,
accountLoadMeter: p.accountLoadMeter,
accountDupMeter: p.accountDupMeter,
accountSkipMeter: p.accountSkipMeter,
accountWasteMeter: p.accountWasteMeter,
storageLoadMeter: p.storageLoadMeter,
storageDupMeter: p.storageDupMeter,
storageSkipMeter: p.storageSkipMeter,
storageWasteMeter: p.storageWasteMeter,
}
// If the prefetcher is already a copy, duplicate the data
if p.fetches != nil {
for root, fetch := range p.fetches {
copy.fetches[root] = p.db.CopyTrie(fetch)
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie, len(p.fetches)),
}
return copy
// p.fetches is safe to be accessed outside of mainloop
// if the triePrefetcher is active, fetches will not be used in mainLoop
// otherwise, inactive triePrefetcher is readonly, it won't modify fetches
for root, fetch := range p.fetches {
fetcherCopied.fetches[root] = p.db.CopyTrie(fetch)
}
return fetcherCopied
}
// Otherwise we're copying an active fetcher, retrieve the current states
for root, fetcher := range p.fetchers {
copy.fetches[root] = fetcher.peek()
select {
case <-p.closeMainChan:
// for closed trie prefetcher, the fetches should not be nil
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie),
}
return fetcherCopied
default:
p.fetchersMutex.RLock()
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie, len(p.fetchers)),
}
// we're copying an active fetcher, retrieve the current states
for root, fetcher := range p.fetchers {
fetcherCopied.fetches[root] = fetcher.peek()
}
p.fetchersMutex.RUnlock()
return fetcherCopied
}
return copy
}
// prefetch schedules a batch of trie items to prefetch.
@ -174,13 +237,10 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c
if p.fetches != nil {
return
}
// Active fetcher, schedule the retrievals
fetcher := p.fetchers[root]
if fetcher == nil {
fetcher = newSubfetcher(p.db, root, accountHash)
p.fetchers[root] = fetcher
select {
case <-p.closeMainChan: // skip closed trie prefetcher
case p.prefetchChan <- &prefetchMsg{root, accountHash, keys}:
}
fetcher.schedule(keys)
}
// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
@ -190,20 +250,25 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
if p.fetches != nil {
trie := p.fetches[root]
if trie == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
return p.db.CopyTrie(trie)
}
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root
p.fetchersMutex.RLock()
fetcher := p.fetchers[root]
p.fetchersMutex.RUnlock()
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
p.abortChan <- fetcher // safe to do multiple times
select {
case <-p.closeAbortChan:
case p.abortChan <- fetcher: // safe to do multiple times
}
trie := fetcher.peek()
if trie == nil {
@ -216,8 +281,23 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the prefetcher is.
func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
if fetcher := p.fetchers[root]; fetcher != nil {
fetcher.used = used
if !metrics.EnabledExpensive {
return
}
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
}
select {
case <-p.closeMainChan:
default:
p.fetchersMutex.RLock()
if fetcher := p.fetchers[root]; fetcher != nil {
fetcher.lock.Lock()
fetcher.used = used
fetcher.lock.Unlock()
}
p.fetchersMutex.RUnlock()
}
}

@ -43,23 +43,33 @@ func filledStateDB() *StateDB {
return state
}
func prefetchGuaranteed(prefetcher *triePrefetcher, root common.Hash, keys [][]byte, accountHash common.Hash) {
prefetcher.prefetch(root, keys, accountHash)
for {
if len(prefetcher.prefetchChan) == 0 {
return
}
time.Sleep(1 * time.Millisecond)
}
}
func TestCopyAndClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
time.Sleep(1 * time.Second)
a := prefetcher.trie(db.originalRoot)
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
b := prefetcher.trie(db.originalRoot)
cpy := prefetcher.copy()
cpy.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
cpy.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(cpy, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(cpy, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
c := cpy.trie(db.originalRoot)
prefetcher.close()
cpy2 := cpy.copy()
cpy2.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(cpy2, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
d := cpy2.trie(db.originalRoot)
cpy.close()
cpy2.close()
@ -72,7 +82,7 @@ func TestUseAfterClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
a := prefetcher.trie(db.originalRoot)
prefetcher.close()
b := prefetcher.trie(db.originalRoot)
@ -88,7 +98,7 @@ func TestCopyClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
cpy := prefetcher.copy()
a := prefetcher.trie(db.originalRoot)
b := cpy.trie(db.originalRoot)