eth/fetcher, trie: unit test reliability fixes (#23020)

Some tests take quite some time during exit, which I think causes
some appveyor fails like this:

    https://ci.appveyor.com/project/ethereum/go-ethereum/builds/39511210/job/xhom84eg2e4uulq3

One of the things that seem to take time during exit is waiting
(up to 100ms) for the syncbloom to close. This PR changes it to use
a channel, instead of looping with a 100ms wait.

This also includes some unrelated changes improving the reliability of
eth/fetcher tests, which fail a lot because they are time-dependent.
This commit is contained in:
Martin Holst Swende 2021-06-30 22:24:17 +02:00 committed by GitHub
parent 686b2884ee
commit c131e812ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 25 deletions

@ -833,7 +833,8 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
// internal state. // internal state.
func (f *BlockFetcher) forgetHash(hash common.Hash) { func (f *BlockFetcher) forgetHash(hash common.Hash) {
// Remove all pending announces and decrement DOS counters // Remove all pending announces and decrement DOS counters
for _, announce := range f.announced[hash] { if announceMap, ok := f.announced[hash]; ok {
for _, announce := range announceMap {
f.announces[announce.origin]-- f.announces[announce.origin]--
if f.announces[announce.origin] <= 0 { if f.announces[announce.origin] <= 0 {
delete(f.announces, announce.origin) delete(f.announces, announce.origin)
@ -843,6 +844,7 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) {
if f.announceChangeHook != nil { if f.announceChangeHook != nil {
f.announceChangeHook(hash, false) f.announceChangeHook(hash, false)
} }
}
// Remove any pending fetches and decrement the DOS counters // Remove any pending fetches and decrement the DOS counters
if announce := f.fetching[hash]; announce != nil { if announce := f.fetching[hash]; announce != nil {
f.announces[announce.origin]-- f.announces[announce.origin]--

@ -698,6 +698,7 @@ func testInvalidNumberAnnouncement(t *testing.T, light bool) {
badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0) badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
imported := make(chan interface{}) imported := make(chan interface{})
announced := make(chan interface{})
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light { if light {
if header == nil { if header == nil {
@ -712,9 +713,23 @@ func testInvalidNumberAnnouncement(t *testing.T, light bool) {
} }
} }
// Announce a block with a bad number, check for immediate drop // Announce a block with a bad number, check for immediate drop
tester.fetcher.announceChangeHook = func(hash common.Hash, b bool) {
announced <- nil
}
tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher) tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
verifyAnnounce := func() {
for i := 0; i < 2; i++ {
select {
case <-announced:
continue
case <-time.After(1 * time.Second):
t.Fatal("announce timeout")
return
}
}
}
verifyAnnounce()
verifyImportEvent(t, imported, false) verifyImportEvent(t, imported, false)
tester.lock.RLock() tester.lock.RLock()
dropped := tester.drops["bad"] dropped := tester.drops["bad"]
tester.lock.RUnlock() tester.lock.RUnlock()
@ -722,11 +737,11 @@ func testInvalidNumberAnnouncement(t *testing.T, light bool) {
if !dropped { if !dropped {
t.Fatalf("peer with invalid numbered announcement not dropped") t.Fatalf("peer with invalid numbered announcement not dropped")
} }
goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack) goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0) goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
// Make sure a good announcement passes without a drop // Make sure a good announcement passes without a drop
tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher) tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
verifyAnnounce()
verifyImportEvent(t, imported, true) verifyImportEvent(t, imported, true)
tester.lock.RLock() tester.lock.RLock()

@ -50,6 +50,7 @@ type SyncBloom struct {
closer sync.Once closer sync.Once
closed uint32 closed uint32
pend sync.WaitGroup pend sync.WaitGroup
closeCh chan struct{}
} }
// NewSyncBloom creates a new bloom filter of the given size (in megabytes) and // NewSyncBloom creates a new bloom filter of the given size (in megabytes) and
@ -65,6 +66,7 @@ func NewSyncBloom(memory uint64, database ethdb.Iteratee) *SyncBloom {
// Assemble the fast sync bloom and init it from previous sessions // Assemble the fast sync bloom and init it from previous sessions
b := &SyncBloom{ b := &SyncBloom{
bloom: bloom, bloom: bloom,
closeCh: make(chan struct{}),
} }
b.pend.Add(2) b.pend.Add(2)
go func() { go func() {
@ -125,17 +127,16 @@ func (b *SyncBloom) init(database ethdb.Iteratee) {
// meter periodically recalculates the false positive error rate of the bloom // meter periodically recalculates the false positive error rate of the bloom
// filter and reports it in a metric. // filter and reports it in a metric.
func (b *SyncBloom) meter() { func (b *SyncBloom) meter() {
// check every second
tick := time.NewTicker(1 * time.Second)
for { for {
select {
case <-tick.C:
// Report the current error ration. No floats, lame, scale it up. // Report the current error ration. No floats, lame, scale it up.
bloomErrorGauge.Update(int64(b.bloom.FalsePosititveProbability() * 100000)) bloomErrorGauge.Update(int64(b.bloom.FalsePosititveProbability() * 100000))
case <-b.closeCh:
// Wait one second, but check termination more frequently
for i := 0; i < 10; i++ {
if atomic.LoadUint32(&b.closed) == 1 {
return return
} }
time.Sleep(100 * time.Millisecond)
}
} }
} }
@ -145,6 +146,7 @@ func (b *SyncBloom) Close() error {
b.closer.Do(func() { b.closer.Do(func() {
// Ensure the initializer is stopped // Ensure the initializer is stopped
atomic.StoreUint32(&b.closed, 1) atomic.StoreUint32(&b.closed, 1)
close(b.closeCh)
b.pend.Wait() b.pend.Wait()
// Wipe the bloom, but mark it "uninited" just in case someone attempts an access // Wipe the bloom, but mark it "uninited" just in case someone attempts an access