Merge pull request #3216 from karalabe/fastsync-bigdb-tuning

core/state, eth/downloader, trie: reset fast-failure on progress
This commit is contained in:
Péter Szilágyi 2016-11-01 13:31:12 +02:00 committed by GitHub
commit f4d878f3d8
7 changed files with 275 additions and 197 deletions

@ -59,8 +59,10 @@ func (s *StateSync) Missing(max int) []common.Hash {
return (*trie.TrieSync)(s).Missing(max) return (*trie.TrieSync)(s).Missing(max)
} }
// Process injects a batch of retrieved trie nodes data. // Process injects a batch of retrieved trie nodes data, returning if something
func (s *StateSync) Process(list []trie.SyncResult) (int, error) { // was committed to the database and also the index of an entry if processing of
// it failed.
func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) {
return (*trie.TrieSync)(s).Process(list) return (*trie.TrieSync)(s).Process(list)
} }

@ -138,7 +138,7 @@ func testIterativeStateSync(t *testing.T, batch int) {
} }
results[i] = trie.SyncResult{Hash: hash, Data: data} results[i] = trie.SyncResult{Hash: hash, Data: data}
} }
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[:0], sched.Missing(batch)...) queue = append(queue[:0], sched.Missing(batch)...)
@ -168,7 +168,7 @@ func TestIterativeDelayedStateSync(t *testing.T) {
} }
results[i] = trie.SyncResult{Hash: hash, Data: data} results[i] = trie.SyncResult{Hash: hash, Data: data}
} }
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[len(results):], sched.Missing(0)...) queue = append(queue[len(results):], sched.Missing(0)...)
@ -206,7 +206,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {
results = append(results, trie.SyncResult{Hash: hash, Data: data}) results = append(results, trie.SyncResult{Hash: hash, Data: data})
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = make(map[common.Hash]struct{}) queue = make(map[common.Hash]struct{})
@ -249,7 +249,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) {
} }
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, hash := range sched.Missing(0) { for _, hash := range sched.Missing(0) {
@ -283,7 +283,7 @@ func TestIncompleteStateSync(t *testing.T) {
results[i] = trie.SyncResult{Hash: hash, Data: data} results[i] = trie.SyncResult{Hash: hash, Data: data}
} }
// Process each of the state nodes // Process each of the state nodes
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, result := range results { for _, result := range results {

@ -67,9 +67,9 @@ var (
fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point fsPivotInterval = 256 // Number of headers out of which to randomize the pivot point
fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing
) )
var ( var (
@ -105,7 +105,7 @@ type Downloader struct {
peers *peerSet // Set of active peers from which download can proceed peers *peerSet // Set of active peers from which download can proceed
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
fsPivotFails int // Number of fast sync failures in the critical section fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
rttEstimate uint64 // Round trip time to target for download requests rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
@ -361,7 +361,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// Set the requested sync mode, unless it's forbidden // Set the requested sync mode, unless it's forbidden
d.mode = mode d.mode = mode
if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials { if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials {
d.mode = FullSync d.mode = FullSync
} }
// Retrieve the origin peer and initiate the downloading process // Retrieve the origin peer and initiate the downloading process
@ -480,6 +480,11 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
d.queue.Close() d.queue.Close()
d.cancel() d.cancel()
wg.Wait() wg.Wait()
// If sync failed in the critical section, bump the fail counter
if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
atomic.AddUint32(&d.fsPivotFails, 1)
}
return err return err
} }
@ -926,10 +931,10 @@ func (d *Downloader) fetchNodeData() error {
var ( var (
deliver = func(packet dataPack) (int, error) { deliver = func(packet dataPack) (int, error) {
start := time.Now() start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
// If the peer returned old-requested data, forgive // If the peer returned old-requested data, forgive
if err == trie.ErrNotRequested { if err == trie.ErrNotRequested {
glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId()) glog.V(logger.Debug).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
return return
} }
if err != nil { if err != nil {
@ -951,6 +956,11 @@ func (d *Downloader) fetchNodeData() error {
syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
d.syncStatsLock.Unlock() d.syncStatsLock.Unlock()
// If real database progress was made, reset any fast-sync pivot failure
if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
glog.V(logger.Debug).Infof("fast-sync progressed, resetting fail counter from %d", atomic.LoadUint32(&d.fsPivotFails))
atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
}
// Log a message to the user and return // Log a message to the user and return
if delivered > 0 { if delivered > 0 {
glog.V(logger.Info).Infof("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending) glog.V(logger.Info).Infof("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending)
@ -1177,7 +1187,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// If we're already past the pivot point, this could be an attack, thread carefully // If we're already past the pivot point, this could be an attack, thread carefully
if rollback[len(rollback)-1].Number.Uint64() > pivot { if rollback[len(rollback)-1].Number.Uint64() > pivot {
// If we didn't ever fail, lock in te pivot header (must! not! change!) // If we didn't ever fail, lock in te pivot header (must! not! change!)
if d.fsPivotFails == 0 { if atomic.LoadUint32(&d.fsPivotFails) == 0 {
for _, header := range rollback { for _, header := range rollback {
if header.Number.Uint64() == pivot { if header.Number.Uint64() == pivot {
glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4]) glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
@ -1185,7 +1195,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
} }
} }
} }
d.fsPivotFails++
} }
} }
}() }()

@ -37,25 +37,79 @@ import (
) )
var ( var (
testdb, _ = ethdb.NewMemDatabase()
testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testAddress = crypto.PubkeyToAddress(testKey.PublicKey) testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
) )
// Reduce some of the parameters to make the tester faster. // Reduce some of the parameters to make the tester faster.
func init() { func init() {
MaxForkAncestry = uint64(10000) MaxForkAncestry = uint64(10000)
blockCacheLimit = 1024 blockCacheLimit = 1024
fsCriticalTrials = 10
}
// downloadTester is a test simulator for mocking out local block chain.
type downloadTester struct {
downloader *Downloader
genesis *types.Block // Genesis blocks used by the tester and peers
stateDb ethdb.Database // Database used by the tester for syncing from peers
peerDb ethdb.Database // Database of the peers containing all data
ownHashes []common.Hash // Hash chain belonging to the tester
ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
peerHashes map[string][]common.Hash // Hash chain belonging to different test peers
peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers
peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers
peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers
peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains
peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return
lock sync.RWMutex
}
// newTester creates a new downloader test mocker.
func newTester() *downloadTester {
testdb, _ := ethdb.NewMemDatabase()
genesis := core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
tester := &downloadTester{
genesis: genesis,
peerDb: testdb,
ownHashes: []common.Hash{genesis.Hash()},
ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
peerHashes: make(map[string][]common.Hash),
peerHeaders: make(map[string]map[common.Hash]*types.Header),
peerBlocks: make(map[string]map[common.Hash]*types.Block),
peerReceipts: make(map[string]map[common.Hash]types.Receipts),
peerChainTds: make(map[string]map[common.Hash]*big.Int),
peerMissingStates: make(map[string]map[common.Hash]bool),
}
tester.stateDb, _ = ethdb.NewMemDatabase()
tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
tester.downloader = New(tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader,
tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd,
tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.rollback, tester.dropPeer)
return tester
} }
// makeChain creates a chain of n blocks starting at and including parent. // makeChain creates a chain of n blocks starting at and including parent.
// the returned hash chain is ordered head->parent. In addition, every 3rd block // the returned hash chain is ordered head->parent. In addition, every 3rd block
// contains a transaction and every 5th an uncle to allow testing correct block // contains a transaction and every 5th an uncle to allow testing correct block
// reassembly. // reassembly.
func makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Receipts, heavy bool) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]types.Receipts) { func (dl *downloadTester) makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Receipts, heavy bool) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]types.Receipts) {
// Generate the block chain // Generate the block chain
blocks, receipts := core.GenerateChain(nil, parent, testdb, n, func(i int, block *core.BlockGen) { blocks, receipts := core.GenerateChain(nil, parent, dl.peerDb, n, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{seed}) block.SetCoinbase(common.Address{seed})
// If a heavy chain is requested, delay blocks to raise difficulty // If a heavy chain is requested, delay blocks to raise difficulty
@ -63,7 +117,7 @@ func makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Recei
block.OffsetTime(-1) block.OffsetTime(-1)
} }
// If the block number is multiple of 3, send a bonus transaction to the miner // If the block number is multiple of 3, send a bonus transaction to the miner
if parent == genesis && i%3 == 0 { if parent == dl.genesis && i%3 == 0 {
tx, err := types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(testKey) tx, err := types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(testKey)
if err != nil { if err != nil {
panic(err) panic(err)
@ -102,19 +156,19 @@ func makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Recei
// makeChainFork creates two chains of length n, such that h1[:f] and // makeChainFork creates two chains of length n, such that h1[:f] and
// h2[:f] are different but have a common suffix of length n-f. // h2[:f] are different but have a common suffix of length n-f.
func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts, balanced bool) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block, map[common.Hash]types.Receipts, map[common.Hash]types.Receipts) { func (dl *downloadTester) makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts, balanced bool) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block, map[common.Hash]types.Receipts, map[common.Hash]types.Receipts) {
// Create the common suffix // Create the common suffix
hashes, headers, blocks, receipts := makeChain(n-f, 0, parent, parentReceipts, false) hashes, headers, blocks, receipts := dl.makeChain(n-f, 0, parent, parentReceipts, false)
// Create the forks, making the second heavyer if non balanced forks were requested // Create the forks, making the second heavyer if non balanced forks were requested
hashes1, headers1, blocks1, receipts1 := makeChain(f, 1, blocks[hashes[0]], receipts[hashes[0]], false) hashes1, headers1, blocks1, receipts1 := dl.makeChain(f, 1, blocks[hashes[0]], receipts[hashes[0]], false)
hashes1 = append(hashes1, hashes[1:]...) hashes1 = append(hashes1, hashes[1:]...)
heavy := false heavy := false
if !balanced { if !balanced {
heavy = true heavy = true
} }
hashes2, headers2, blocks2, receipts2 := makeChain(f, 2, blocks[hashes[0]], receipts[hashes[0]], heavy) hashes2, headers2, blocks2, receipts2 := dl.makeChain(f, 2, blocks[hashes[0]], receipts[hashes[0]], heavy)
hashes2 = append(hashes2, hashes[1:]...) hashes2 = append(hashes2, hashes[1:]...)
for hash, header := range headers { for hash, header := range headers {
@ -132,53 +186,6 @@ func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts,
return hashes1, hashes2, headers1, headers2, blocks1, blocks2, receipts1, receipts2 return hashes1, hashes2, headers1, headers2, blocks1, blocks2, receipts1, receipts2
} }
// downloadTester is a test simulator for mocking out local block chain.
type downloadTester struct {
stateDb ethdb.Database
downloader *Downloader
ownHashes []common.Hash // Hash chain belonging to the tester
ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
peerHashes map[string][]common.Hash // Hash chain belonging to different test peers
peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers
peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers
peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers
peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains
peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return
lock sync.RWMutex
}
// newTester creates a new downloader test mocker.
func newTester() *downloadTester {
tester := &downloadTester{
ownHashes: []common.Hash{genesis.Hash()},
ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
peerHashes: make(map[string][]common.Hash),
peerHeaders: make(map[string]map[common.Hash]*types.Header),
peerBlocks: make(map[string]map[common.Hash]*types.Block),
peerReceipts: make(map[string]map[common.Hash]types.Receipts),
peerChainTds: make(map[string]map[common.Hash]*big.Int),
peerMissingStates: make(map[string]map[common.Hash]bool),
}
tester.stateDb, _ = ethdb.NewMemDatabase()
tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
tester.downloader = New(tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader,
tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd,
tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.rollback, tester.dropPeer)
return tester
}
// terminate aborts any operations on the embedded downloader and releases all // terminate aborts any operations on the embedded downloader and releases all
// held resources. // held resources.
func (dl *downloadTester) terminate() { func (dl *downloadTester) terminate() {
@ -251,7 +258,7 @@ func (dl *downloadTester) headHeader() *types.Header {
return header return header
} }
} }
return genesis.Header() return dl.genesis.Header()
} }
// headBlock retrieves the current head block from the canonical chain. // headBlock retrieves the current head block from the canonical chain.
@ -266,7 +273,7 @@ func (dl *downloadTester) headBlock() *types.Block {
} }
} }
} }
return genesis return dl.genesis
} }
// headFastBlock retrieves the current head fast-sync block from the canonical chain. // headFastBlock retrieves the current head fast-sync block from the canonical chain.
@ -279,7 +286,7 @@ func (dl *downloadTester) headFastBlock() *types.Block {
return block return block
} }
} }
return genesis return dl.genesis
} }
// commitHeadBlock manually sets the head block to a given hash. // commitHeadBlock manually sets the head block to a given hash.
@ -351,7 +358,7 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) {
return len(blocks), nil return len(blocks), nil
} }
// insertReceipts injects a new batch of blocks into the simulated chain. // insertReceipts injects a new batch of receipts into the simulated chain.
func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.Receipts) (int, error) { func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.Receipts) (int, error) {
dl.lock.Lock() dl.lock.Lock()
defer dl.lock.Unlock() defer dl.lock.Unlock()
@ -586,7 +593,7 @@ func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func
results := make([][]byte, 0, len(hashes)) results := make([][]byte, 0, len(hashes))
for _, hash := range hashes { for _, hash := range hashes {
if data, err := testdb.Get(hash.Bytes()); err == nil { if data, err := dl.peerDb.Get(hash.Bytes()); err == nil {
if !dl.peerMissingStates[id][hash] { if !dl.peerMissingStates[id][hash] {
results = append(results, data) results = append(results, data)
} }
@ -669,13 +676,13 @@ func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronis
func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Synchronise with the peer and make sure all relevant data was retrieved // Synchronise with the peer and make sure all relevant data was retrieved
@ -694,13 +701,13 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) } func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) { func testThrottling(t *testing.T, protocol int, mode SyncMode) {
// Create a long block chain to download and the tester
targetBlocks := 8 * blockCacheLimit
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a long block chain to download and the tester
targetBlocks := 8 * blockCacheLimit
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Wrap the importer to allow stepping // Wrap the importer to allow stepping
@ -782,13 +789,13 @@ func TestForkedSync64Light(t *testing.T) { testForkedSync(t, 64, LightSync) }
func testForkedSync(t *testing.T, protocol int, mode SyncMode) { func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
// Create a long enough forked chain
common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a long enough forked chain
common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true)
tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB) tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB)
@ -817,13 +824,13 @@ func TestHeavyForkedSync64Light(t *testing.T) { testHeavyForkedSync(t, 64, Light
func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
// Create a long enough forked chain
common, fork := MaxHashFetch, 4*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a long enough forked chain
common, fork := MaxHashFetch, 4*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, false)
tester.newPeer("light", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("light", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("heavy", protocol, hashesB[fork/2:], headersB, blocksB, receiptsB) tester.newPeer("heavy", protocol, hashesB[fork/2:], headersB, blocksB, receiptsB)
@ -853,13 +860,13 @@ func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, L
func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) { func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
// Create a long enough forked chain
common, fork := 13, int(MaxForkAncestry+17)
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a long enough forked chain
common, fork := 13, int(MaxForkAncestry+17)
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true)
tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("rewriter", protocol, hashesB, headersB, blocksB, receiptsB) tester.newPeer("rewriter", protocol, hashesB, headersB, blocksB, receiptsB)
@ -888,13 +895,13 @@ func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSyn
func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
// Create a long enough forked chain
common, fork := 13, int(MaxForkAncestry+17)
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a long enough forked chain
common, fork := 13, int(MaxForkAncestry+17)
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, false)
tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("heavy-rewriter", protocol, hashesB[MaxForkAncestry-17:], headersB, blocksB, receiptsB) // Root the fork below the ancestor limit tester.newPeer("heavy-rewriter", protocol, hashesB[MaxForkAncestry-17:], headersB, blocksB, receiptsB) // Root the fork below the ancestor limit
@ -958,6 +965,9 @@ func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
func testCancel(t *testing.T, protocol int, mode SyncMode) { func testCancel(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
tester := newTester()
defer tester.terminate()
// Create a small enough block chain to download and the tester // Create a small enough block chain to download and the tester
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
if targetBlocks >= MaxHashFetch { if targetBlocks >= MaxHashFetch {
@ -966,10 +976,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) {
if targetBlocks >= MaxHeaderFetch { if targetBlocks >= MaxHeaderFetch {
targetBlocks = MaxHeaderFetch - 15 targetBlocks = MaxHeaderFetch - 15
} }
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
tester := newTester()
defer tester.terminate()
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
@ -999,13 +1006,13 @@ func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t,
func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
tester := newTester()
defer tester.terminate()
// Create various peers with various parts of the chain // Create various peers with various parts of the chain
targetPeers := 8 targetPeers := 8
targetBlocks := targetPeers*blockCacheLimit - 15 targetBlocks := targetPeers*blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
tester := newTester()
defer tester.terminate()
for i := 0; i < targetPeers; i++ { for i := 0; i < targetPeers; i++ {
id := fmt.Sprintf("peer #%d", i) id := fmt.Sprintf("peer #%d", i)
@ -1029,14 +1036,14 @@ func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t,
func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
// Create peers of every type
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Create peers of every type
tester.newPeer("peer 62", 62, hashes, headers, blocks, nil) tester.newPeer("peer 62", 62, hashes, headers, blocks, nil)
tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts) tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts)
tester.newPeer("peer 64", 64, hashes, headers, blocks, receipts) tester.newPeer("peer 64", 64, hashes, headers, blocks, receipts)
@ -1068,13 +1075,13 @@ func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, L
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
// Create a block chain to download
targetBlocks := 2*blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a block chain to download
targetBlocks := 2*blockCacheLimit - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Instrument the downloader to signal body requests // Instrument the downloader to signal body requests
@ -1094,7 +1101,7 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
// Validate the number of block bodies that should have been requested // Validate the number of block bodies that should have been requested
bodiesNeeded, receiptsNeeded := 0, 0 bodiesNeeded, receiptsNeeded := 0, 0
for _, block := range blocks { for _, block := range blocks {
if mode != LightSync && block != genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) { if mode != LightSync && block != tester.genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
bodiesNeeded++ bodiesNeeded++
} }
} }
@ -1123,13 +1130,13 @@ func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 6
func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Attempt a full sync with an attacker feeding gapped headers // Attempt a full sync with an attacker feeding gapped headers
tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
missing := targetBlocks / 2 missing := targetBlocks / 2
@ -1156,13 +1163,13 @@ func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 6
func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) } func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) }
func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Attempt a full sync with an attacker feeding shifted headers // Attempt a full sync with an attacker feeding shifted headers
tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
delete(tester.peerHeaders["attack"], hashes[len(hashes)-2]) delete(tester.peerHeaders["attack"], hashes[len(hashes)-2])
@ -1188,13 +1195,13 @@ func TestInvalidHeaderRollback64Fast(t *testing.T) { testInvalidHeaderRollback(
func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) } func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) }
func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := 3*fsHeaderSafetyNet + fsMinFullBlocks
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a small enough block chain to download
targetBlocks := 3*fsHeaderSafetyNet + fsPivotInterval + fsMinFullBlocks
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Attempt to sync with an attacker that feeds junk during the fast sync phase. // Attempt to sync with an attacker that feeds junk during the fast sync phase.
// This should result in the last fsHeaderSafetyNet headers being rolled back. // This should result in the last fsHeaderSafetyNet headers being rolled back.
tester.newPeer("fast-attack", protocol, hashes, headers, blocks, receipts) tester.newPeer("fast-attack", protocol, hashes, headers, blocks, receipts)
@ -1286,7 +1293,7 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false) hashes, headers, blocks, receipts := tester.makeChain(0, 0, tester.genesis, nil, false)
tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts) tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts)
if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
@ -1333,7 +1340,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
for i, tt := range tests { for i, tt := range tests {
// Register a new peer and ensure it's presence // Register a new peer and ensure it's presence
id := fmt.Sprintf("test %d", i) id := fmt.Sprintf("test %d", i)
if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil, nil, nil); err != nil { if err := tester.newPeer(id, protocol, []common.Hash{tester.genesis.Hash()}, nil, nil, nil); err != nil {
t.Fatalf("test %d: failed to register new peer: %v", i, err) t.Fatalf("test %d: failed to register new peer: %v", i, err)
} }
if _, ok := tester.peerHashes[id]; !ok { if _, ok := tester.peerHashes[id]; !ok {
@ -1342,7 +1349,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
// Simulate a synchronisation and check the required result // Simulate a synchronisation and check the required result
tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
tester.downloader.Synchronise(id, genesis.Hash(), big.NewInt(1000), FullSync) tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync)
if _, ok := tester.peerHashes[id]; !ok != tt.drop { if _, ok := tester.peerHashes[id]; !ok != tt.drop {
t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
} }
@ -1361,17 +1368,17 @@ func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync)
func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
tester := newTester()
defer tester.terminate()
// Create a small enough block chain to download // Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Set a sync init hook to catch progress changes // Set a sync init hook to catch progress changes
starting := make(chan struct{}) starting := make(chan struct{})
progress := make(chan struct{}) progress := make(chan struct{})
tester := newTester()
defer tester.terminate()
tester.downloader.syncInitHook = func(origin, latest uint64) { tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{} starting <- struct{}{}
<-progress <-progress
@ -1434,17 +1441,17 @@ func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64,
func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
tester := newTester()
defer tester.terminate()
// Create a forked chain to simulate origin revertal // Create a forked chain to simulate origin revertal
common, fork := MaxHashFetch, 2*MaxHashFetch common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true) hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true)
// Set a sync init hook to catch progress changes // Set a sync init hook to catch progress changes
starting := make(chan struct{}) starting := make(chan struct{})
progress := make(chan struct{}) progress := make(chan struct{})
tester := newTester()
defer tester.terminate()
tester.downloader.syncInitHook = func(origin, latest uint64) { tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{} starting <- struct{}{}
<-progress <-progress
@ -1510,17 +1517,17 @@ func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64,
func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
tester := newTester()
defer tester.terminate()
// Create a small enough block chain to download // Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Set a sync init hook to catch progress changes // Set a sync init hook to catch progress changes
starting := make(chan struct{}) starting := make(chan struct{})
progress := make(chan struct{}) progress := make(chan struct{})
tester := newTester()
defer tester.terminate()
tester.downloader.syncInitHook = func(origin, latest uint64) { tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{} starting <- struct{}{}
<-progress <-progress
@ -1587,17 +1594,17 @@ func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, L
func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
tester := newTester()
defer tester.terminate()
// Create a small block chain // Create a small block chain
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil, false) hashes, headers, blocks, receipts := tester.makeChain(targetBlocks+3, 0, tester.genesis, nil, false)
// Set a sync init hook to catch progress changes // Set a sync init hook to catch progress changes
starting := make(chan struct{}) starting := make(chan struct{})
progress := make(chan struct{}) progress := make(chan struct{})
tester := newTester()
defer tester.terminate()
tester.downloader.syncInitHook = func(origin, latest uint64) { tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{} starting <- struct{}{}
<-progress <-progress
@ -1664,10 +1671,16 @@ func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64,
func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil, false)
master := newTester()
defer master.terminate()
hashes, headers, blocks, receipts := master.makeChain(5, 0, master.genesis, nil, false)
fakeHeads := []*types.Header{{}, {}, {}, {}} fakeHeads := []*types.Header{{}, {}, {}, {}}
for i := 0; i < 200; i++ { for i := 0; i < 200; i++ {
tester := newTester() tester := newTester()
tester.peerDb = master.peerDb
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Whenever the downloader requests headers, flood it with // Whenever the downloader requests headers, flood it with
// a lot of unrequested header deliveries. // a lot of unrequested header deliveries.
@ -1703,44 +1716,81 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
// Tests that if fast sync aborts in the critical section, it can restart a few // Tests that if fast sync aborts in the critical section, it can restart a few
// times before giving up. // times before giving up.
func TestFastCriticalRestarts63(t *testing.T) { testFastCriticalRestarts(t, 63) } func TestFastCriticalRestartsFail63(t *testing.T) { testFastCriticalRestarts(t, 63, false) }
func TestFastCriticalRestarts64(t *testing.T) { testFastCriticalRestarts(t, 64) } func TestFastCriticalRestartsFail64(t *testing.T) { testFastCriticalRestarts(t, 64, false) }
func TestFastCriticalRestartsCont63(t *testing.T) { testFastCriticalRestarts(t, 63, true) }
func TestFastCriticalRestartsCont64(t *testing.T) { testFastCriticalRestarts(t, 64, true) }
func testFastCriticalRestarts(t *testing.T, protocol int) { func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) {
t.Parallel()
// Create a large enough blockchin to actually fast sync on
targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
// Create a tester peer with the critical section state roots missing (force failures)
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
// Create a large enough blockchin to actually fast sync on
targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Create a tester peer with a critical section header missing (force failures)
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
delete(tester.peerHeaders["peer"], hashes[fsMinFullBlocks-1])
tester.downloader.dropPeer = func(id string) {} // We reuse the same "faulty" peer throughout the test
// Remove all possible pivot state roots and slow down replies (test failure resets later)
for i := 0; i < fsPivotInterval; i++ { for i := 0; i < fsPivotInterval; i++ {
tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true
} }
tester.downloader.dropPeer = func(id string) {} // We reuse the same "faulty" peer throughout the test tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 500*time.Millisecond) // Enough to reach the critical section
// Synchronise with the peer a few times and make sure they fail until the retry limit // Synchronise with the peer a few times and make sure they fail until the retry limit
for i := 0; i < fsCriticalTrials; i++ { for i := 0; i < int(fsCriticalTrials)-1; i++ {
// Attempt a sync and ensure it fails properly // Attempt a sync and ensure it fails properly
if err := tester.sync("peer", nil, FastSync); err == nil { if err := tester.sync("peer", nil, FastSync); err == nil {
t.Fatalf("failing fast sync succeeded: %v", err) t.Fatalf("failing fast sync succeeded: %v", err)
} }
time.Sleep(100 * time.Millisecond) // Make sure no in-flight requests remain time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain
// If it's the first failure, pivot should be locked => reenable all others to detect pivot changes // If it's the first failure, pivot should be locked => reenable all others to detect pivot changes
if i == 0 { if i == 0 {
if tester.downloader.fsPivotLock == nil {
time.Sleep(400 * time.Millisecond) // Make sure the first huge timeout expires too
t.Fatalf("pivot block not locked in after critical section failure")
}
tester.lock.Lock() tester.lock.Lock()
tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]]
tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true} tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true}
tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 0)
tester.lock.Unlock() tester.lock.Unlock()
} }
} }
// Return all nodes if we're testing fast sync progression
if progress {
tester.lock.Lock()
tester.peerMissingStates["peer"] = map[common.Hash]bool{}
tester.lock.Unlock()
if err := tester.sync("peer", nil, FastSync); err != nil {
t.Fatalf("failed to synchronise blocks in progressed fast sync: %v", err)
}
time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain
if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != 1 {
t.Fatalf("progressed pivot trial count mismatch: have %v, want %v", fails, 1)
}
assertOwnChain(t, tester, targetBlocks+1)
} else {
if err := tester.sync("peer", nil, FastSync); err == nil {
t.Fatalf("succeeded to synchronise blocks in failed fast sync")
}
time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain
if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != fsCriticalTrials {
t.Fatalf("failed pivot trial count mismatch: have %v, want %v", fails, fsCriticalTrials)
}
}
// Retry limit exhausted, downloader will switch to full sync, should succeed // Retry limit exhausted, downloader will switch to full sync, should succeed
if err := tester.sync("peer", nil, FastSync); err != nil { if err := tester.sync("peer", nil, FastSync); err != nil {
t.Fatalf("failed to synchronise blocks in slow sync: %v", err) t.Fatalf("failed to synchronise blocks in slow sync: %v", err)
} }
assertOwnChain(t, tester, targetBlocks+1) // Note, we can't assert the chain here because the test asserter assumes sync
// completed using a single mode of operation, whereas fast-then-slow can result
// in arbitrary intermediate state that's not cleanly verifiable.
} }

@ -362,20 +362,20 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
// Make sure chain order is honoured and preserved throughout // Make sure chain order is honoured and preserved throughout
hash := header.Hash() hash := header.Hash()
if header.Number == nil || header.Number.Uint64() != from { if header.Number == nil || header.Number.Uint64() != from {
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from) glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from)
break break
} }
if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash { if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4]) glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4])
break break
} }
// Make sure no duplicate requests are executed // Make sure no duplicate requests are executed
if _, ok := q.blockTaskPool[hash]; ok { if _, ok := q.blockTaskPool[hash]; ok {
glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for block fetch", header.Number.Uint64(), hash[:4]) glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for block fetch", header.Number.Uint64(), hash[:4])
continue continue
} }
if _, ok := q.receiptTaskPool[hash]; ok { if _, ok := q.receiptTaskPool[hash]; ok {
glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4]) glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4])
continue continue
} }
// Queue the header for content retrieval // Queue the header for content retrieval
@ -388,7 +388,16 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
} }
if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot {
// Pivoting point of the fast sync, retrieve the state tries // Pivoting point of the fast sync, switch the state retrieval to this
glog.V(logger.Debug).Infof("Switching state downloads to %d [%x…]", header.Number.Uint64(), header.Hash().Bytes()[:4])
q.stateTaskIndex = 0
q.stateTaskPool = make(map[common.Hash]int)
q.stateTaskQueue.Reset()
for _, req := range q.statePendPool {
req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear
}
q.stateSchedLock.Lock() q.stateSchedLock.Lock()
q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
q.stateSchedLock.Unlock() q.stateSchedLock.Unlock()
@ -866,10 +875,10 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
accepted := len(headers) == MaxHeaderFetch accepted := len(headers) == MaxHeaderFetch
if accepted { if accepted {
if headers[0].Number.Uint64() != request.From { if headers[0].Number.Uint64() != request.From {
glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From) glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From)
accepted = false accepted = false
} else if headers[len(headers)-1].Hash() != target { } else if headers[len(headers)-1].Hash() != target {
glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4]) glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4])
accepted = false accepted = false
} }
} }
@ -877,12 +886,12 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
for i, header := range headers[1:] { for i, header := range headers[1:] {
hash := header.Hash() hash := header.Hash()
if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ordering, expected %d", id, header.Number, hash[:4], want) glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ordering, expected %d", id, header.Number, hash[:4], want)
accepted = false accepted = false
break break
} }
if headers[i].Hash() != header.ParentHash { if headers[i].Hash() != header.ParentHash {
glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ancestry", id, header.Number, hash[:4]) glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ancestry", id, header.Number, hash[:4])
accepted = false accepted = false
break break
} }
@ -1039,9 +1048,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
} }
// DeliverNodeData injects a node state data retrieval response into the queue. // DeliverNodeData injects a node state data retrieval response into the queue.
// The method returns the number of node state entries originally requested, and // The method returns the number of node state accepted from the delivery.
// the number of them actually accepted from the delivery. func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) {
func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -1099,31 +1107,34 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// deliverNodeData is the asynchronous node data processor that injects a batch // deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler. // of sync results into the state scheduler.
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(int, bool, error)) {
// Wake up WaitResults after the state has been written because it // Wake up WaitResults after the state has been written because it
// might be waiting for the pivot block state to get completed. // might be waiting for the pivot block state to get completed.
defer q.active.Signal() defer q.active.Signal()
// Process results one by one to permit task fetches in between // Process results one by one to permit task fetches in between
progressed := false
for i, result := range results { for i, result := range results {
q.stateSchedLock.Lock() q.stateSchedLock.Lock()
if q.stateScheduler == nil { if q.stateScheduler == nil {
// Syncing aborted since this async delivery started, bail out // Syncing aborted since this async delivery started, bail out
q.stateSchedLock.Unlock() q.stateSchedLock.Unlock()
callback(errNoFetchesPending, i) callback(i, progressed, errNoFetchesPending)
return return
} }
if _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil { if prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil {
// Processing a state result failed, bail out // Processing a state result failed, bail out
q.stateSchedLock.Unlock() q.stateSchedLock.Unlock()
callback(err, i) callback(i, progressed, err)
return return
} else if prog {
progressed = true
} }
// Item processing succeeded, release the lock (temporarily) // Item processing succeeded, release the lock (temporarily)
q.stateSchedLock.Unlock() q.stateSchedLock.Unlock()
} }
callback(nil, len(results)) callback(len(results), progressed, nil)
} }
// Prepare configures the result cache to allow accepting and caching inbound // Prepare configures the result cache to allow accepting and caching inbound

@ -142,34 +142,40 @@ func (s *TrieSync) Missing(max int) []common.Hash {
return requests return requests
} }
// Process injects a batch of retrieved trie nodes data. // Process injects a batch of retrieved trie nodes data, returning if something
func (s *TrieSync) Process(results []SyncResult) (int, error) { // was committed to the database and also the index of an entry if processing of
// it failed.
func (s *TrieSync) Process(results []SyncResult) (bool, int, error) {
committed := false
for i, item := range results { for i, item := range results {
// If the item was not requested, bail out // If the item was not requested, bail out
request := s.requests[item.Hash] request := s.requests[item.Hash]
if request == nil { if request == nil {
return i, ErrNotRequested return committed, i, ErrNotRequested
} }
// If the item is a raw entry request, commit directly // If the item is a raw entry request, commit directly
if request.raw { if request.raw {
request.data = item.Data request.data = item.Data
s.commit(request, nil) s.commit(request, nil)
committed = true
continue continue
} }
// Decode the node data content and update the request // Decode the node data content and update the request
node, err := decodeNode(item.Hash[:], item.Data, 0) node, err := decodeNode(item.Hash[:], item.Data, 0)
if err != nil { if err != nil {
return i, err return committed, i, err
} }
request.data = item.Data request.data = item.Data
// Create and schedule a request for all the children nodes // Create and schedule a request for all the children nodes
requests, err := s.children(request, node) requests, err := s.children(request, node)
if err != nil { if err != nil {
return i, err return committed, i, err
} }
if len(requests) == 0 && request.deps == 0 { if len(requests) == 0 && request.deps == 0 {
s.commit(request, nil) s.commit(request, nil)
committed = true
continue continue
} }
request.deps += len(requests) request.deps += len(requests)
@ -177,7 +183,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) {
s.schedule(child) s.schedule(child)
} }
} }
return 0, nil return committed, 0, nil
} }
// Pending returns the number of state entries currently pending for download. // Pending returns the number of state entries currently pending for download.

@ -122,7 +122,7 @@ func testIterativeTrieSync(t *testing.T, batch int) {
} }
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[:0], sched.Missing(batch)...) queue = append(queue[:0], sched.Missing(batch)...)
@ -152,7 +152,7 @@ func TestIterativeDelayedTrieSync(t *testing.T) {
} }
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[len(results):], sched.Missing(10000)...) queue = append(queue[len(results):], sched.Missing(10000)...)
@ -190,7 +190,7 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) {
results = append(results, SyncResult{hash, data}) results = append(results, SyncResult{hash, data})
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = make(map[common.Hash]struct{}) queue = make(map[common.Hash]struct{})
@ -231,7 +231,7 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) {
} }
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, result := range results { for _, result := range results {
@ -272,7 +272,7 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[:0], sched.Missing(0)...) queue = append(queue[:0], sched.Missing(0)...)
@ -304,7 +304,7 @@ func TestIncompleteTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
// Process each of the trie nodes // Process each of the trie nodes
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, result := range results { for _, result := range results {