eth/downloader: separate state sync from queue (#14460)

* eth/downloader: separate state sync from queue

Scheduling of state node downloads hogged the downloader queue lock when
new requests were scheduled. This caused timeouts for other requests.
With this change, state sync is fully independent of all other downloads
and doesn't involve the queue at all.

State sync is started and checked on in processContent. This is slightly
awkward because processContent doesn't have a select loop. Instead, the
queue is closed by an auxiliary goroutine when state sync fails. We
tried several alternatives to this but settled on the current approach
because it's the least amount of change overall.

Handling of the pivot block has changed slightly: the queue previously
prevented import of pivot block receipts before the state of the pivot
block was available. In this commit, the receipt will be imported before
the state. This causes an annoyance where the pivot block is committed
as fast block head even when state downloads fail. Stay tuned for more
updates in this area ;)

* eth/downloader: remove cancelTimeout channel

* eth/downloader: retry state requests on timeout

* eth/downloader: improve comment

* eth/downloader: mark peers idle when state sync is done

* eth/downloader: move pivot block splitting to processContent

This change also ensures that pivot block receipts aren't imported
before the pivot block itself.

* eth/downloader: limit state node retries

* eth/downloader: improve state node error handling and retry check

* eth/downloader: remove maxStateNodeRetries

It fails the sync too much.

* eth/downloader: remove last use of cancelCh in statesync.go

Fixes TestDeliverHeadersHang*Fast and (hopefully)
the weird cancellation behaviour at the end of fast sync.

* eth/downloader: fix leak in runStateSync

* eth/downloader: don't run processFullSyncContent in LightSync mode

* eth/downloader: improve comments

* eth/downloader: fix vet, megacheck

* eth/downloader: remove unrequested tasks anyway

* eth/downloader, trie: various polishes around duplicate items

This commit explicitly tracks duplicate and unexpected state
delieveries done against a trie Sync structure, also adding
there to import info logs.

The commit moves the db batch used to commit trie changes one
level deeper so its flushed after every node insertion. This
is needed to avoid a lot of duplicate retrievals caused by
inconsistencies between Sync internals and database. A better
approach is to track not-yet-written states in trie.Sync and
flush on commit, but I'm focuing on correctness first now.

The commit fixes a regression around pivot block fail count.
The counter previously was reset to 1 if and only if a sync
cycle progressed (inserted at least 1 entry to the database).
The current code reset it already if a node was delivered,
which is not stong enough, because unless it ends up written
to disk, an attacker can just loop and attack ad infinitum.

The commit also fixes a regression around state deliveries
and timeouts. The old downloader tracked if a delivery is
stale (none of the deliveries were requestedt), in which
case it didn't mark the node idle and did not send further
requests, since it signals a past timeout. The current code
did mark it idle even on stale deliveries, which eventually
caused two requests to be in flight at the same time, making
the deliveries always stale and mass duplicating retrievals
between multiple peers.

* eth/downloader: fix state request leak

This commit fixes the hang seen sometimes while doing the state
sync. The cause of the hang was a rare combination of events:
request state data from peer, peer drops and reconnects almost
immediately. This caused a new download task to be assigned to
the peer, overwriting the old one still waiting for a timeout,
which in turned leaked the requests out, never to be retried.
The fix is to ensure that a task assignment moves any pending
one back into the retry queue.

The commit also fixes a regression with peer dropping due to
stalls. The current code considered a peer stalling if they
timed out delivering 1 item. However, the downloader never
requests only one, the minimum is 2 (attempt to fine tune
estimated latency/bandwidth). The fix is simply to drop if
a timeout is detected at 2 items.

Apart from the above bugfixes, the commit contains some code
polishes I made while debugging the hang.

* core, eth, trie: support batched trie sync db writes

* trie: rename SyncMemCache to syncMemBatch
This commit is contained in:
Felix Lange 2017-06-22 14:26:03 +02:00 committed by Péter Szilágyi
parent 58a1e13e6d
commit 0042f13d47
9 changed files with 770 additions and 480 deletions

@ -59,10 +59,16 @@ func (s *StateSync) Missing(max int) []common.Hash {
}
// Process injects a batch of retrieved trie nodes data, returning if something
// was committed to the database and also the index of an entry if processing of
// was committed to the memcache and also the index of an entry if processing of
// it failed.
func (s *StateSync) Process(list []trie.SyncResult, dbw trie.DatabaseWriter) (bool, int, error) {
return (*trie.TrieSync)(s).Process(list, dbw)
func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) {
return (*trie.TrieSync)(s).Process(list)
}
// Commit flushes the data stored in the internal memcache out to persistent
// storage, returning th enumber of items written and any occurred error.
func (s *StateSync) Commit(dbw trie.DatabaseWriter) (int, error) {
return (*trie.TrieSync)(s).Commit(dbw)
}
// Pending returns the number of state entries currently pending for download.

@ -138,9 +138,12 @@ func testIterativeStateSync(t *testing.T, batch int) {
}
results[i] = trie.SyncResult{Hash: hash, Data: data}
}
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
queue = append(queue[:0], sched.Missing(batch)...)
}
// Cross check that the two states are in sync
@ -168,9 +171,12 @@ func TestIterativeDelayedStateSync(t *testing.T) {
}
results[i] = trie.SyncResult{Hash: hash, Data: data}
}
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
queue = append(queue[len(results):], sched.Missing(0)...)
}
// Cross check that the two states are in sync
@ -206,9 +212,12 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {
results = append(results, trie.SyncResult{Hash: hash, Data: data})
}
// Feed the retrieved results back and queue new tasks
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
@ -249,9 +258,12 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) {
}
}
// Feed the retrieved results back and queue new tasks
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
for _, hash := range sched.Missing(0) {
queue[hash] = struct{}{}
}
@ -283,9 +295,12 @@ func TestIncompleteStateSync(t *testing.T) {
results[i] = trie.SyncResult{Hash: hash, Data: data}
}
// Process each of the state nodes
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
for _, result := range results {
added = append(added, result.Hash)
}

@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
)
@ -99,8 +98,9 @@ type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mux *event.TypeMux // Event multiplexer to announce sync operation events
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
@ -109,9 +109,9 @@ type Downloader struct {
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
// Statistics
syncStatsChainOrigin uint64 // Origin block number where syncing started at
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsStateDone uint64 // Number of state trie entries already pulled
syncStatsChainOrigin uint64 // Origin block number where syncing started at
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsState stateSyncStats
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
// Callbacks
@ -136,16 +136,18 @@ type Downloader struct {
notified int32
// Channels
newPeerCh chan *peer
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
// for stateFetcher
stateSyncStart chan *stateSync
trackStateReq chan *stateReq
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
@ -170,8 +172,9 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
dl := &Downloader{
mode: mode,
mux: mux,
queue: newQueue(stateDb),
queue: newQueue(),
peers: newPeerSet(),
stateDB: stateDb,
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
hasHeader: hasHeader,
@ -188,18 +191,20 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
insertReceipts: insertReceipts,
rollback: rollback,
dropPeer: dropPeer,
newPeerCh: make(chan *peer, 1),
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
stateCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
stateWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
// for stateFetcher
stateSyncStart: make(chan *stateSync),
trackStateReq: make(chan *stateReq),
stateCh: make(chan dataPack),
}
go dl.qosTuner()
go dl.stateFetcher()
return dl
}
@ -211,9 +216,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
func (d *Downloader) Progress() ethereum.SyncProgress {
// Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
pendingStates := uint64(d.queue.PendingNodeData())
// Lock the current stats and return the progress
d.syncStatsLock.RLock()
defer d.syncStatsLock.RUnlock()
@ -231,8 +233,8 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
StartingBlock: d.syncStatsChainOrigin,
CurrentBlock: current,
HighestBlock: d.syncStatsChainHeight,
PulledStates: d.syncStatsStateDone,
KnownStates: d.syncStatsStateDone + pendingStates,
PulledStates: d.syncStatsState.processed,
KnownStates: d.syncStatsState.processed + d.syncStatsState.pending,
}
}
@ -324,13 +326,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
d.queue.Reset()
d.peers.Reset()
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case <-ch:
default:
}
}
for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
for empty := false; !empty; {
select {
case <-ch:
@ -439,30 +441,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
return d.spawnSync(origin+1,
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
)
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, td) },
}
if d.mode == FastSync {
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
} else if d.mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
err = d.spawnSync(fetchers)
if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
// If sync failed in the critical section, bump the fail counter.
atomic.AddUint32(&d.fsPivotFails, 1)
}
return err
}
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
func (d *Downloader) spawnSync(fetchers []func() error) error {
var wg sync.WaitGroup
errc := make(chan error, len(fetchers)+1)
wg.Add(len(fetchers) + 1)
go func() { defer wg.Done(); errc <- d.processContent() }()
errc := make(chan error, len(fetchers))
wg.Add(len(fetchers))
for _, fn := range fetchers {
fn := fn
go func() { defer wg.Done(); errc <- fn() }()
}
// Wait for the first error, then terminate the others.
var err error
for i := 0; i < len(fetchers)+1; i++ {
if i == len(fetchers) {
for i := 0; i < len(fetchers); i++ {
if i == len(fetchers)-1 {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
@ -475,11 +487,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
d.queue.Close()
d.Cancel()
wg.Wait()
// If sync failed in the critical section, bump the fail counter
if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
atomic.AddUint32(&d.fsPivotFails, 1)
}
return err
}
@ -552,7 +559,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
return nil, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@ -649,7 +655,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return 0, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@ -714,7 +719,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return 0, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@ -827,7 +831,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@ -927,68 +931,6 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return err
}
// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
// available peers, reserving a chunk of nodes for each, waiting for delivery and
// also periodically checking for timeouts.
func (d *Downloader) fetchNodeData() error {
log.Debug("Downloading node state data")
var (
deliver = func(packet dataPack) (int, error) {
start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
// If the peer returned old-requested data, forgive
if err == trie.ErrNotRequested {
log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId())
return
}
if err != nil {
// If the node data processing failed, the root hash is very wrong, abort
log.Error("State processing failed", "peer", packet.PeerId(), "err", err)
d.Cancel()
return
}
// Processing succeeded, notify state fetcher of continuation
pending := d.queue.PendingNodeData()
if pending > 0 {
select {
case d.stateWakeCh <- true:
default:
}
}
d.syncStatsLock.Lock()
d.syncStatsStateDone += uint64(delivered)
syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
d.syncStatsLock.Unlock()
// If real database progress was made, reset any fast-sync pivot failure
if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails))
atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
}
// Log a message to the user and return
if delivered > 0 {
log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending)
}
})
}
expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveNodeData(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
)
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states")
log.Debug("Node state data download terminated", "err", err)
return err
}
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
@ -1229,7 +1171,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Terminate header processing if we synced up
if len(headers) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@ -1341,7 +1283,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
origin += uint64(limit)
}
// Signal the content downloaders of the availablility of new tasks
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- true:
default:
@ -1351,73 +1293,153 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
// processContent takes fetch results from the queue and tries to import them
// into the chain. The type of import operation will depend on the result contents.
func (d *Downloader) processContent() error {
pivot := d.queue.FastSyncPivot()
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
for {
results := d.queue.WaitResults()
if len(results) == 0 {
return nil // queue empty
return nil
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
// Actually import the blocks
first, last := results[0].Header, results[len(results)-1].Header
if err := d.importBlockResults(results); err != nil {
return err
}
}
}
func (d *Downloader) importBlockResults(results []*fetchResult) error {
for len(results) != 0 {
// Check for any termination requests. This makes clean shutdown faster.
select {
case <-d.quitCh:
return errCancelContentProcessing
default:
}
// Retrieve the a batch of results to import
items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
first, last := results[0].Header, results[items-1].Header
log.Debug("Inserting downloaded chain", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
"lastnum", last.Number, "lasthash", last.Hash(),
)
for len(results) != 0 {
// Check for any termination requests
select {
case <-d.quitCh:
return errCancelContentProcessing
default:
blocks := make([]*types.Block, items)
for i, result := range results[:items] {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
}
if index, err := d.insertBlocks(blocks); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
// Shift the results to the next batch
results = results[items:]
}
return nil
}
// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func (d *Downloader) processFastSyncContent(latest *types.Header) error {
// Start syncing state of the reported head block.
// This should get us most of the state of the pivot block.
stateSync := d.syncState(latest.Root)
defer stateSync.Cancel()
go func() {
if err := stateSync.Wait(); err != nil {
d.queue.Close() // wake up WaitResults
}
}()
pivot := d.queue.FastSyncPivot()
for {
results := d.queue.WaitResults()
if len(results) == 0 {
return stateSync.Cancel()
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
P, beforeP, afterP := splitAroundPivot(pivot, results)
if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
return err
}
if P != nil {
stateSync.Cancel()
if err := d.commitPivotBlock(P); err != nil {
return err
}
// Retrieve the a batch of results to import
var (
blocks = make([]*types.Block, 0, maxResultsProcess)
receipts = make([]types.Receipts, 0, maxResultsProcess)
)
items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
for _, result := range results[:items] {
switch {
case d.mode == FullSync:
blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
case d.mode == FastSync:
blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
if result.Header.Number.Uint64() <= pivot {
receipts = append(receipts, result.Receipts)
}
}
}
// Try to process the results, aborting if there's an error
var (
err error
index int
)
switch {
case len(receipts) > 0:
index, err = d.insertReceipts(blocks, receipts)
if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash())
index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
}
default:
index, err = d.insertBlocks(blocks)
}
if err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
// Shift the results to the next batch
results = results[items:]
}
if err := d.importBlockResults(afterP); err != nil {
return err
}
}
}
func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
for _, result := range results {
num := result.Header.Number.Uint64()
switch {
case num < pivot:
before = append(before, result)
case num == pivot:
p = result
default:
after = append(after, result)
}
}
return p, before, after
}
func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
for len(results) != 0 {
// Check for any termination requests.
select {
case <-d.quitCh:
return errCancelContentProcessing
case <-stateSync.done:
if err := stateSync.Wait(); err != nil {
return err
}
default:
}
// Retrieve the a batch of results to import
items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
first, last := results[0].Header, results[items-1].Header
log.Debug("Inserting fast-sync blocks", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
"lastnumn", last.Number, "lasthash", last.Hash(),
)
blocks := make([]*types.Block, items)
receipts := make([]types.Receipts, items)
for i, result := range results[:items] {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
receipts[i] = result.Receipts
}
if index, err := d.insertReceipts(blocks, receipts); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
// Shift the results to the next batch
results = results[items:]
}
return nil
}
func (d *Downloader) commitPivotBlock(result *fetchResult) error {
b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
// Sync the pivot block state. This should complete reasonably quickly because
// we've already synced up to the reported head block state earlier.
if err := d.syncState(b.Root()).Wait(); err != nil {
return err
}
log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
if _, err := d.insertReceipts([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
return err
}
return d.commitHeadBlock(b.Hash())
}
// DeliverHeaders injects a new batch of block headers received from a remote
// node into the download schedule.
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {

@ -38,8 +38,6 @@ var (
receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop")
receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout")
stateInMeter = metrics.NewMeter("eth/downloader/states/in")
stateReqTimer = metrics.NewTimer("eth/downloader/states/req")
stateDropMeter = metrics.NewMeter("eth/downloader/states/drop")
stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout")
stateInMeter = metrics.NewMeter("eth/downloader/states/in")
stateDropMeter = metrics.NewMeter("eth/downloader/states/drop")
)

@ -30,6 +30,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
@ -195,7 +196,7 @@ func (p *peer) FetchReceipts(request *fetchRequest) error {
}
// FetchNodeData sends a node state data retrieval request to the remote peer.
func (p *peer) FetchNodeData(request *fetchRequest) error {
func (p *peer) FetchNodeData(hashes []common.Hash) error {
// Sanity check the protocol version
if p.version < 63 {
panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version))
@ -205,14 +206,7 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return errAlreadyFetching
}
p.stateStarted = time.Now()
// Convert the hash set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Hashes))
for hash := range request.Hashes {
hashes = append(hashes, hash)
}
go p.getNodeData(hashes)
return nil
}
@ -343,8 +337,9 @@ func (p *peer) Lacks(hash common.Hash) bool {
// peerSet represents the collection of active peer participating in the chain
// download procedure.
type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
peers map[string]*peer
newPeerFeed event.Feed
lock sync.RWMutex
}
// newPeerSet creates a new peer set top track the active download sources.
@ -354,6 +349,10 @@ func newPeerSet() *peerSet {
}
}
func (ps *peerSet) SubscribeNewPeers(ch chan<- *peer) event.Subscription {
return ps.newPeerFeed.Subscribe(ch)
}
// Reset iterates over the current peer set, and resets each of the known peers
// to prepare for a next batch of block retrieval.
func (ps *peerSet) Reset() {
@ -377,9 +376,8 @@ func (ps *peerSet) Register(p *peer) error {
// Register the new peer with some meaningful defaults
ps.lock.Lock()
defer ps.lock.Unlock()
if _, ok := ps.peers[p.id]; ok {
ps.lock.Unlock()
return errAlreadyRegistered
}
if len(ps.peers) > 0 {
@ -399,6 +397,9 @@ func (ps *peerSet) Register(p *peer) error {
p.stateThroughput /= float64(len(ps.peers))
}
ps.peers[p.id] = p
ps.lock.Unlock()
ps.newPeerFeed.Send(p)
return nil
}

@ -26,20 +26,13 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently
)
var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
var (
errNoFetchesPending = errors.New("no fetches pending")
@ -94,15 +87,6 @@ type queue struct {
receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritised retrieval order
stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority
stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for
statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations
stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
stateWriters int // [eth/63] Number of running state DB writer goroutines
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
@ -112,7 +96,7 @@ type queue struct {
}
// newQueue creates a new download queue for scheduling block retrieval.
func newQueue(stateDb ethdb.Database) *queue {
func newQueue() *queue {
lock := new(sync.Mutex)
return &queue{
headerPendPool: make(map[string]*fetchRequest),
@ -125,10 +109,6 @@ func newQueue(stateDb ethdb.Database) *queue {
receiptTaskQueue: prque.New(),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
stateTaskPool: make(map[common.Hash]int),
stateTaskQueue: prque.New(),
statePendPool: make(map[string]*fetchRequest),
stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit),
active: sync.NewCond(lock),
lock: lock,
@ -158,12 +138,6 @@ func (q *queue) Reset() {
q.receiptPendPool = make(map[string]*fetchRequest)
q.receiptDonePool = make(map[common.Hash]struct{})
q.stateTaskIndex = 0
q.stateTaskPool = make(map[common.Hash]int)
q.stateTaskQueue.Reset()
q.statePendPool = make(map[string]*fetchRequest)
q.stateScheduler = nil
q.resultCache = make([]*fetchResult, blockCacheLimit)
q.resultOffset = 0
}
@ -201,28 +175,6 @@ func (q *queue) PendingReceipts() int {
return q.receiptTaskQueue.Size()
}
// PendingNodeData retrieves the number of node data entries pending for retrieval.
func (q *queue) PendingNodeData() int {
q.lock.Lock()
defer q.lock.Unlock()
return q.pendingNodeDataLocked()
}
// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval.
// The caller must hold q.lock.
func (q *queue) pendingNodeDataLocked() int {
var n int
if q.stateScheduler != nil {
n = q.stateScheduler.Pending()
}
// Ensure that PendingNodeData doesn't return 0 until all state is written.
if q.stateWriters > 0 {
n++
}
return n
}
// InFlightHeaders retrieves whether there are header fetch requests currently
// in flight.
func (q *queue) InFlightHeaders() bool {
@ -250,28 +202,15 @@ func (q *queue) InFlightReceipts() bool {
return len(q.receiptPendPool) > 0
}
// InFlightNodeData retrieves whether there are node data entry fetch requests
// currently in flight.
func (q *queue) InFlightNodeData() bool {
q.lock.Lock()
defer q.lock.Unlock()
return len(q.statePendPool)+q.stateWriters > 0
}
// Idle returns if the queue is fully idle or has some data still inside. This
// method is used by the tester to detect termination events.
// Idle returns if the queue is fully idle or has some data still inside.
func (q *queue) Idle() bool {
q.lock.Lock()
defer q.lock.Unlock()
queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool)
cached := len(q.blockDonePool) + len(q.receiptDonePool)
if q.stateScheduler != nil {
queued += q.stateScheduler.Pending()
}
return (queued + pending + cached) == 0
}
@ -389,19 +328,6 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot {
// Pivoting point of the fast sync, switch the state retrieval to this
log.Debug("Switching state downloads to new block", "number", header.Number, "hash", hash)
q.stateTaskIndex = 0
q.stateTaskPool = make(map[common.Hash]int)
q.stateTaskQueue.Reset()
for _, req := range q.statePendPool {
req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear
}
q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
}
inserts = append(inserts, header)
q.headerHead = hash
from++
@ -448,31 +374,15 @@ func (q *queue) countProcessableItems() int {
if result == nil || result.Pending > 0 {
return i
}
// Special handling for the fast-sync pivot block:
if q.mode == FastSync {
bnum := result.Header.Number.Uint64()
if bnum == q.fastSyncPivot {
// If the state of the pivot block is not
// available yet, we cannot proceed and return 0.
//
// Stop before processing the pivot block to ensure that
// resultCache has space for fsHeaderForceVerify items. Not
// doing this could leave us unable to download the required
// amount of headers.
if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 {
// Stop before processing the pivot block to ensure that
// resultCache has space for fsHeaderForceVerify items. Not
// doing this could leave us unable to download the required
// amount of headers.
if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
for j := 0; j < fsHeaderForceVerify; j++ {
if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
return i
}
for j := 0; j < fsHeaderForceVerify; j++ {
if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
return i
}
}
}
// If we're just the fast sync pivot, stop as well
// because the following batch needs different insertion.
// This simplifies handling the switchover in d.process.
if bnum == q.fastSyncPivot+1 && i > 0 {
return i
}
}
}
@ -519,81 +429,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
return request
}
// ReserveNodeData reserves a set of node data hashes for the given peer, skipping
// any previously failed download.
func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
// Create a task generator to fetch status-fetch tasks if all schedules ones are done
generator := func(max int) {
if q.stateScheduler != nil {
for _, hash := range q.stateScheduler.Missing(max) {
q.stateTaskPool[hash] = q.stateTaskIndex
q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
q.stateTaskIndex++
}
}
}
q.lock.Lock()
defer q.lock.Unlock()
return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, maxInFlightStates)
}
// reserveHashes reserves a set of hashes for the given peer, skipping previously
// failed ones.
//
// Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest {
// Short circuit if the peer's already downloading something (sanity check to
// not corrupt state)
if _, ok := pendPool[p.id]; ok {
return nil
}
// Calculate an upper limit on the hashes we might fetch (i.e. throttling)
allowance := maxPending
if allowance > 0 {
for _, request := range pendPool {
allowance -= len(request.Hashes)
}
}
// If there's a task generator, ask it to fill our task queue
if taskGen != nil && taskQueue.Size() < allowance {
taskGen(allowance - taskQueue.Size())
}
if taskQueue.Empty() {
return nil
}
// Retrieve a batch of hashes, skipping previously failed ones
send := make(map[common.Hash]int)
skip := make(map[common.Hash]int)
for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
hash, priority := taskQueue.Pop()
if p.Lacks(hash.(common.Hash)) {
skip[hash.(common.Hash)] = int(priority)
} else {
send[hash.(common.Hash)] = int(priority)
}
}
// Merge all the skipped hashes back
for hash, index := range skip {
taskQueue.Push(hash, float32(index))
}
// Assemble and return the block download request
if len(send) == 0 {
return nil
}
request := &fetchRequest{
Peer: p,
Hashes: send,
Time: time.Now(),
}
pendPool[p.id] = request
return request
}
// ReserveBodies reserves a set of body fetches for the given peer, skipping any
// previously failed downloads. Beside the next batch of needed fetches, it also
// returns a flag whether empty blocks were queued requiring processing.
@ -722,12 +557,6 @@ func (q *queue) CancelReceipts(request *fetchRequest) {
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}
// CancelNodeData aborts a node state data fetch request, returning all pending
// hashes to the task queue.
func (q *queue) CancelNodeData(request *fetchRequest) {
q.cancel(request, q.stateTaskQueue, q.statePendPool)
}
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
q.lock.Lock()
@ -764,12 +593,6 @@ func (q *queue) Revoke(peerId string) {
}
delete(q.receiptPendPool, peerId)
}
if request, ok := q.statePendPool[peerId]; ok {
for hash, index := range request.Hashes {
q.stateTaskQueue.Push(hash, float32(index))
}
delete(q.statePendPool, peerId)
}
}
// ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
@ -799,15 +622,6 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
}
// ExpireNodeData checks for in flight node data requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter)
}
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
//
@ -1044,84 +858,6 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
}
// DeliverNodeData injects a node state data retrieval response into the queue.
// The method returns the number of node state accepted from the delivery.
func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the data was never requested
request := q.statePendPool[id]
if request == nil {
return 0, errNoFetchesPending
}
stateReqTimer.UpdateSince(request.Time)
delete(q.statePendPool, id)
// If no data was retrieved, mark their hashes as unavailable for the origin peer
if len(data) == 0 {
for hash := range request.Hashes {
request.Peer.MarkLacking(hash)
}
}
// Iterate over the downloaded data and verify each of them
errs := make([]error, 0)
process := []trie.SyncResult{}
for _, blob := range data {
// Skip any state trie entries that were not requested
hash := common.BytesToHash(crypto.Keccak256(blob))
if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
continue
}
// Inject the next state trie item into the processing queue
process = append(process, trie.SyncResult{Hash: hash, Data: blob})
delete(request.Hashes, hash)
delete(q.stateTaskPool, hash)
}
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
q.stateTaskQueue.Push(hash, float32(index))
}
if q.stateScheduler == nil {
return 0, errNoFetchesPending
}
// Run valid nodes through the trie download scheduler. It writes completed nodes to a
// batch, which is committed asynchronously. This may lead to over-fetches because the
// scheduler treats everything as written after Process has returned, but it's
// unlikely to be an issue in practice.
batch := q.stateDatabase.NewBatch()
progressed, nproc, procerr := q.stateScheduler.Process(process, batch)
q.stateWriters += 1
go func() {
if procerr == nil {
nproc = len(process)
procerr = batch.Write()
}
// Return processing errors through the callback so the sync gets canceled. The
// number of writers is decremented prior to the call so PendingNodeData will
// return zero when the callback runs.
q.lock.Lock()
q.stateWriters -= 1
q.lock.Unlock()
callback(nproc, progressed, procerr)
// Wake up WaitResults after the state has been written because it might be
// waiting for completion of the pivot block's state download.
q.active.Signal()
}()
// If none of the data items were good, it's a stale delivery
switch {
case len(errs) == 0:
return len(process), nil
case len(errs) == len(request.Hashes):
return len(process), errStaleDelivery
default:
return len(process), fmt.Errorf("multiple failures: %v", errs)
}
}
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
@ -1134,9 +870,4 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.
}
q.fastSyncPivot = pivot
q.mode = mode
// If long running fast sync, also start up a head stateretrieval immediately
if mode == FastSync && pivot > 0 {
q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase)
}
}

449
eth/downloader/statesync.go Normal file

@ -0,0 +1,449 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"hash"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
)
// stateReq represents a batch of state fetch requests groupped together into
// a single data retrieval network packet.
type stateReq struct {
items []common.Hash // Hashes of the state items to download
tasks map[common.Hash]*stateTask // Download tasks to track previous attempts
timeout time.Duration // Maximum round trip time for this to complete
timer *time.Timer // Timer to fire when the RTT timeout expires
peer *peer // Peer that we're requesting from
response [][]byte // Response data of the peer (nil for timeouts)
}
// timedOut returns if this request timed out.
func (req *stateReq) timedOut() bool {
return req.response == nil
}
// stateSyncStats is a collection of progress stats to report during a state trie
// sync to RPC requests as well as to display in user logs.
type stateSyncStats struct {
processed uint64 // Number of state entries processed
duplicate uint64 // Number of state entries downloaded twice
unexpected uint64 // Number of non-requested state entries received
pending uint64 // Number of still pending state entries
}
// syncState starts downloading state with the given root hash.
func (d *Downloader) syncState(root common.Hash) *stateSync {
s := newStateSync(d, root)
select {
case d.stateSyncStart <- s:
case <-d.quitCh:
s.err = errCancelStateFetch
close(s.done)
}
return s
}
// stateFetcher manages the active state sync and accepts requests
// on its behalf.
func (d *Downloader) stateFetcher() {
for {
select {
case s := <-d.stateSyncStart:
for next := s; next != nil; {
next = d.runStateSync(next)
}
case <-d.stateCh:
// Ignore state responses while no sync is running.
case <-d.quitCh:
return
}
}
}
// runStateSync runs a state synchronisation until it completes or another root
// hash is requested to be switched over to.
func (d *Downloader) runStateSync(s *stateSync) *stateSync {
var (
active = make(map[string]*stateReq) // Currently in-flight requests
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
)
defer func() {
// Cancel active request timers on exit. Also set peers to idle so they're
// available for the next sync.
for _, req := range active {
req.timer.Stop()
req.peer.SetNodeDataIdle(len(req.items))
}
}()
// Run the state sync.
go s.run()
defer s.Cancel()
for {
// Enable sending of the first buffered element if there is one.
var (
deliverReq *stateReq
deliverReqCh chan *stateReq
)
if len(finished) > 0 {
deliverReq = finished[0]
deliverReqCh = s.deliver
}
select {
// The stateSync lifecycle:
case next := <-d.stateSyncStart:
return next
case <-s.done:
return nil
// Send the next finished request to the current sync:
case deliverReqCh <- deliverReq:
finished = append(finished[:0], finished[1:]...)
// Handle incoming state packs:
case pack := <-d.stateCh:
// Discard any data not requested (or previsouly timed out)
req := active[pack.PeerId()]
if req == nil {
log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
continue
}
// Finalize the request and queue up for processing
req.timer.Stop()
req.response = pack.(*statePack).states
finished = append(finished, req)
delete(active, pack.PeerId())
// Handle timed-out requests:
case req := <-timeout:
// If the peer is already requesting something else, ignore the stale timeout.
// This can happen when the timeout and the delivery happens simultaneously,
// causing both pathways to trigger.
if active[req.peer.id] != req {
continue
}
// Move the timed out data back into the download queue
finished = append(finished, req)
delete(active, req.peer.id)
// Track outgoing state requests:
case req := <-d.trackStateReq:
// If an active request already exists for this peer, we have a problem. In
// theory the trie node schedule must never assign two requests to the same
// peer. In practive however, a peer might receive a request, disconnect and
// immediately reconnect before the previous times out. In this case the first
// request is never honored, alas we must not silently overwrite it, as that
// causes valid requests to go missing and sync to get stuck.
if old := active[req.peer.id]; old != nil {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
// Make sure the previous one doesn't get siletly lost
finished = append(finished, old)
}
// Start a timer to notify the sync loop if the peer stalled.
req.timer = time.AfterFunc(req.timeout, func() {
select {
case timeout <- req:
case <-s.done:
// Prevent leaking of timer goroutines in the unlikely case where a
// timer is fired just before exiting runStateSync.
}
})
active[req.peer.id] = req
}
}
}
// stateSync schedules requests for downloading a particular state trie defined
// by a given state root.
type stateSync struct {
d *Downloader // Downloader instance to access and manage current peerset
sched *state.StateSync // State trie sync scheduler defining the tasks
keccak hash.Hash // Keccak256 hasher to verify deliveries with
tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval
deliver chan *stateReq // Delivery channel multiplexing peer responses
cancel chan struct{} // Channel to signal a termination request
cancelOnce sync.Once // Ensures cancel only ever gets called once
done chan struct{} // Channel to signal termination completion
err error // Any error hit during sync (set before completion)
}
// stateTask represents a single trie node download taks, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort.
type stateTask struct {
attempts map[string]struct{}
}
// newStateSync creates a new state trie download scheduler. This method does not
// yet start the sync. The user needs to call run to initiate.
func newStateSync(d *Downloader, root common.Hash) *stateSync {
return &stateSync{
d: d,
sched: state.NewStateSync(root, d.stateDB),
keccak: sha3.NewKeccak256(),
tasks: make(map[common.Hash]*stateTask),
deliver: make(chan *stateReq),
cancel: make(chan struct{}),
done: make(chan struct{}),
}
}
// run starts the task assignment and response processing loop, blocking until
// it finishes, and finally notifying any goroutines waiting for the loop to
// finish.
func (s *stateSync) run() {
s.err = s.loop()
close(s.done)
}
// Wait blocks until the sync is done or canceled.
func (s *stateSync) Wait() error {
<-s.done
return s.err
}
// Cancel cancels the sync and waits until it has shut down.
func (s *stateSync) Cancel() error {
s.cancelOnce.Do(func() { close(s.cancel) })
return s.Wait()
}
// loop is the main event loop of a state trie sync. It it responsible for the
// assignment of new tasks to peers (including sending it to them) as well as
// for the processing of inbound data. Note, that the loop does not directly
// receive data from peers, rather those are buffered up in the downloader and
// pushed here async. The reason is to decouple processing from data receipt
// and timeouts.
func (s *stateSync) loop() error {
// Listen for new peer events to assign tasks to them
newPeer := make(chan *peer, 1024)
peerSub := s.d.peers.SubscribeNewPeers(newPeer)
defer peerSub.Unsubscribe()
// Keep assigning new tasks until the sync completes or aborts
for s.sched.Pending() > 0 {
if err := s.assignTasks(); err != nil {
return err
}
// Tasks assigned, wait for something to happen
select {
case <-newPeer:
// New peer arrived, try to assign it download tasks
case <-s.cancel:
return errCancelStateFetch
case req := <-s.deliver:
// Response or timeout triggered, drop the peer if stalling
log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "timeout", req.timedOut())
if len(req.items) <= 2 && req.timedOut() {
// 2 items are the minimum requested, if even that times out, we've no use of
// this peer at the moment.
log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
s.d.dropPeer(req.peer.id)
}
// Process all the received blobs and check for stale delivery
stale, err := s.process(req)
if err != nil {
log.Warn("Node data write error", "err", err)
return err
}
// The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery)
if !stale {
req.peer.SetNodeDataIdle(len(req.response))
}
}
}
return nil
}
// assignTasks attempts to assing new tasks to all idle peers, either from the
// batch currently being retried, or fetching new data from the trie sync itself.
func (s *stateSync) assignTasks() error {
// Iterate over all idle peers and try to assign them state fetches
peers, _ := s.d.peers.NodeDataIdlePeers()
for _, p := range peers {
// Assign a batch of fetches proportional to the estimated latency/bandwidth
cap := p.NodeDataCapacity(s.d.requestRTT())
req := &stateReq{peer: p, timeout: s.d.requestTTL()}
s.fillTasks(cap, req)
// If the peer was assigned tasks to fetch, send the network request
if len(req.items) > 0 {
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
select {
case s.d.trackStateReq <- req:
req.peer.FetchNodeData(req.items)
case <-s.cancel:
}
}
}
return nil
}
// fillTasks fills the given request object with a maximum of n state download
// tasks to send to the remote peer.
func (s *stateSync) fillTasks(n int, req *stateReq) {
// Refill available tasks from the scheduler.
if len(s.tasks) < n {
new := s.sched.Missing(n - len(s.tasks))
for _, hash := range new {
s.tasks[hash] = &stateTask{make(map[string]struct{})}
}
}
// Find tasks that haven't been tried with the request's peer.
req.items = make([]common.Hash, 0, n)
req.tasks = make(map[common.Hash]*stateTask, n)
for hash, t := range s.tasks {
// Stop when we've gathered enough requests
if len(req.items) == n {
break
}
// Skip any requests we've already tried from this peer
if _, ok := t.attempts[req.peer.id]; ok {
continue
}
// Assign the request to this peer
t.attempts[req.peer.id] = struct{}{}
req.items = append(req.items, hash)
req.tasks[hash] = t
delete(s.tasks, hash)
}
}
// process iterates over a batch of delivered state data, injecting each item
// into a running state sync, re-queuing any items that were requested but not
// delivered.
func (s *stateSync) process(req *stateReq) (bool, error) {
// Collect processing stats and update progress if valid data was received
processed, written, duplicate, unexpected := 0, 0, 0, 0
defer func(start time.Time) {
if processed+written+duplicate+unexpected > 0 {
s.updateStats(processed, written, duplicate, unexpected, time.Since(start))
}
}(time.Now())
// Iterate over all the delivered data and inject one-by-one into the trie
progress, stale := false, len(req.response) > 0
for _, blob := range req.response {
prog, hash, err := s.processNodeData(blob)
switch err {
case nil:
processed++
case trie.ErrNotRequested:
unexpected++
case trie.ErrAlreadyProcessed:
duplicate++
default:
return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
if prog {
progress = true
}
// If the node delivered a requested item, mark the delivery non-stale
if _, ok := req.tasks[hash]; ok {
delete(req.tasks, hash)
stale = false
}
}
// If some data managed to hit the database, flush and reset failure counters
if progress {
// Flush any accumulated data out to disk
batch := s.d.stateDB.NewBatch()
count, err := s.sched.Commit(batch)
if err != nil {
return stale, err
}
if err := batch.Write(); err != nil {
return stale, err
}
written = count
// If we're inside the critical section, reset fail counter since we progressed
if atomic.LoadUint32(&s.d.fsPivotFails) > 1 {
log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails))
atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
}
}
// Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len()
for hash, task := range req.tasks {
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls).
if len(req.response) > 0 || req.timedOut() {
delete(task.attempts, req.peer.id)
}
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers {
return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
}
// Missing item, place into the retry queue.
s.tasks[hash] = task
}
return stale, nil
}
// processNodeData tries to inject a trie node data blob delivered from a remote
// peer into the state trie, returning whether anything useful was written or any
// error occurred.
func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) {
res := trie.SyncResult{Data: blob}
s.keccak.Reset()
s.keccak.Write(blob)
s.keccak.Sum(res.Hash[:0])
committed, _, err := s.sched.Process([]trie.SyncResult{res})
return committed, res.Hash, err
}
// updateStats bumps the various state sync progress counters and displays a log
// message for the user to see.
func (s *stateSync) updateStats(processed, written, duplicate, unexpected int, duration time.Duration) {
s.d.syncStatsLock.Lock()
defer s.d.syncStatsLock.Unlock()
s.d.syncStatsState.pending = uint64(s.sched.Pending())
s.d.syncStatsState.processed += uint64(processed)
s.d.syncStatsState.duplicate += uint64(duplicate)
s.d.syncStatsState.unexpected += uint64(unexpected)
log.Info("Imported new state entries", "count", processed, "flushed", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected)
}

@ -28,6 +28,10 @@ import (
// node it did not request.
var ErrNotRequested = errors.New("not requested")
// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a
// node it already processed previously.
var ErrAlreadyProcessed = errors.New("already processed")
// request represents a scheduled or already in-flight state retrieval request.
type request struct {
hash common.Hash // Hash of the node data content to retrieve
@ -48,6 +52,21 @@ type SyncResult struct {
Data []byte // Data content of the retrieved node
}
// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
// persisted data items.
type syncMemBatch struct {
batch map[common.Hash][]byte // In-memory membatch of recently ocmpleted items
order []common.Hash // Order of completion to prevent out-of-order data loss
}
// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch() *syncMemBatch {
return &syncMemBatch{
batch: make(map[common.Hash][]byte),
order: make([]common.Hash, 0, 256),
}
}
// TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a
// leaf node. It's used by state syncing to check if the leaf node requires some
// further data syncing.
@ -57,7 +76,8 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
type TrieSync struct {
database DatabaseReader
database DatabaseReader // Persistent database to check for existing entries
membatch *syncMemBatch // Memory buffer to avoid frequest database writes
requests map[common.Hash]*request // Pending requests pertaining to a key hash
queue *prque.Prque // Priority queue with the pending requests
}
@ -66,6 +86,7 @@ type TrieSync struct {
func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync {
ts := &TrieSync{
database: database,
membatch: newSyncMemBatch(),
requests: make(map[common.Hash]*request),
queue: prque.New(),
}
@ -79,6 +100,9 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c
if root == emptyRoot {
return
}
if _, ok := s.membatch.batch[root]; ok {
return
}
key := root.Bytes()
blob, _ := s.database.Get(key)
if local, err := decodeNode(key, blob, 0); local != nil && err == nil {
@ -111,6 +135,9 @@ func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash)
if hash == emptyState {
return
}
if _, ok := s.membatch.batch[hash]; ok {
return
}
if blob, _ := s.database.Get(hash.Bytes()); blob != nil {
return
}
@ -144,7 +171,7 @@ func (s *TrieSync) Missing(max int) []common.Hash {
// Process injects a batch of retrieved trie nodes data, returning if something
// was committed to the database and also the index of an entry if processing of
// it failed.
func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, error) {
func (s *TrieSync) Process(results []SyncResult) (bool, int, error) {
committed := false
for i, item := range results {
@ -153,10 +180,13 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int,
if request == nil {
return committed, i, ErrNotRequested
}
if request.data != nil {
return committed, i, ErrAlreadyProcessed
}
// If the item is a raw entry request, commit directly
if request.raw {
request.data = item.Data
s.commit(request, dbw)
s.commit(request)
committed = true
continue
}
@ -173,7 +203,7 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int,
return committed, i, err
}
if len(requests) == 0 && request.deps == 0 {
s.commit(request, dbw)
s.commit(request)
committed = true
continue
}
@ -185,6 +215,22 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int,
return committed, 0, nil
}
// Commit flushes the data stored in the internal membatch out to persistent
// storage, returning th enumber of items written and any occurred error.
func (s *TrieSync) Commit(dbw DatabaseWriter) (int, error) {
// Dump the membatch into a database dbw
for i, key := range s.membatch.order {
if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil {
return i, err
}
}
written := len(s.membatch.order)
// Drop the membatch data and return
s.membatch = newSyncMemBatch()
return written, nil
}
// Pending returns the number of state entries currently pending for download.
func (s *TrieSync) Pending() int {
return len(s.requests)
@ -246,13 +292,17 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) {
// If the child references another node, resolve or schedule
if node, ok := (child.node).(hashNode); ok {
// Try to resolve the node from the local database
hash := common.BytesToHash(node)
if _, ok := s.membatch.batch[hash]; ok {
continue
}
blob, _ := s.database.Get(node)
if local, err := decodeNode(node[:], blob, 0); local != nil && err == nil {
continue
}
// Locally unknown node, schedule for retrieval
requests = append(requests, &request{
hash: common.BytesToHash(node),
hash: hash,
parents: []*request{req},
depth: child.depth,
callback: req.callback,
@ -262,21 +312,21 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) {
return requests, nil
}
// commit finalizes a retrieval request and stores it into the database. If any
// commit finalizes a retrieval request and stores it into the membatch. If any
// of the referencing parent requests complete due to this commit, they are also
// committed themselves.
func (s *TrieSync) commit(req *request, dbw DatabaseWriter) (err error) {
// Write the node content to disk
if err := dbw.Put(req.hash[:], req.data); err != nil {
return err
}
func (s *TrieSync) commit(req *request) (err error) {
// Write the node content to the membatch
s.membatch.batch[req.hash] = req.data
s.membatch.order = append(s.membatch.order, req.hash)
delete(s.requests, req.hash)
// Check all parents for completion
for _, parent := range req.parents {
parent.deps--
if parent.deps == 0 {
if err := s.commit(parent, dbw); err != nil {
if err := s.commit(parent); err != nil {
return err
}
}

@ -122,9 +122,12 @@ func testIterativeTrieSync(t *testing.T, batch int) {
}
results[i] = SyncResult{hash, data}
}
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
queue = append(queue[:0], sched.Missing(batch)...)
}
// Cross check that the two tries are in sync
@ -152,9 +155,12 @@ func TestIterativeDelayedTrieSync(t *testing.T) {
}
results[i] = SyncResult{hash, data}
}
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
queue = append(queue[len(results):], sched.Missing(10000)...)
}
// Cross check that the two tries are in sync
@ -190,9 +196,12 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) {
results = append(results, SyncResult{hash, data})
}
// Feed the retrieved results back and queue new tasks
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
@ -231,9 +240,12 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) {
}
}
// Feed the retrieved results back and queue new tasks
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
for _, result := range results {
delete(queue, result.Hash)
}
@ -272,9 +284,12 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data}
}
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
queue = append(queue[:0], sched.Missing(0)...)
}
// Cross check that the two tries are in sync
@ -304,9 +319,12 @@ func TestIncompleteTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data}
}
// Process each of the trie nodes
if _, index, err := sched.Process(results, dstDb); err != nil {
if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
if index, err := sched.Commit(dstDb); err != nil {
t.Fatalf("failed to commit data #%d: %v", index, err)
}
for _, result := range results {
added = append(added, result.Hash)
}