From 0042f13d47700987e93e413be549b312e81854ac Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 22 Jun 2017 14:26:03 +0200 Subject: [PATCH] eth/downloader: separate state sync from queue (#14460) * eth/downloader: separate state sync from queue Scheduling of state node downloads hogged the downloader queue lock when new requests were scheduled. This caused timeouts for other requests. With this change, state sync is fully independent of all other downloads and doesn't involve the queue at all. State sync is started and checked on in processContent. This is slightly awkward because processContent doesn't have a select loop. Instead, the queue is closed by an auxiliary goroutine when state sync fails. We tried several alternatives to this but settled on the current approach because it's the least amount of change overall. Handling of the pivot block has changed slightly: the queue previously prevented import of pivot block receipts before the state of the pivot block was available. In this commit, the receipt will be imported before the state. This causes an annoyance where the pivot block is committed as fast block head even when state downloads fail. Stay tuned for more updates in this area ;) * eth/downloader: remove cancelTimeout channel * eth/downloader: retry state requests on timeout * eth/downloader: improve comment * eth/downloader: mark peers idle when state sync is done * eth/downloader: move pivot block splitting to processContent This change also ensures that pivot block receipts aren't imported before the pivot block itself. * eth/downloader: limit state node retries * eth/downloader: improve state node error handling and retry check * eth/downloader: remove maxStateNodeRetries It fails the sync too much. * eth/downloader: remove last use of cancelCh in statesync.go Fixes TestDeliverHeadersHang*Fast and (hopefully) the weird cancellation behaviour at the end of fast sync. * eth/downloader: fix leak in runStateSync * eth/downloader: don't run processFullSyncContent in LightSync mode * eth/downloader: improve comments * eth/downloader: fix vet, megacheck * eth/downloader: remove unrequested tasks anyway * eth/downloader, trie: various polishes around duplicate items This commit explicitly tracks duplicate and unexpected state delieveries done against a trie Sync structure, also adding there to import info logs. The commit moves the db batch used to commit trie changes one level deeper so its flushed after every node insertion. This is needed to avoid a lot of duplicate retrievals caused by inconsistencies between Sync internals and database. A better approach is to track not-yet-written states in trie.Sync and flush on commit, but I'm focuing on correctness first now. The commit fixes a regression around pivot block fail count. The counter previously was reset to 1 if and only if a sync cycle progressed (inserted at least 1 entry to the database). The current code reset it already if a node was delivered, which is not stong enough, because unless it ends up written to disk, an attacker can just loop and attack ad infinitum. The commit also fixes a regression around state deliveries and timeouts. The old downloader tracked if a delivery is stale (none of the deliveries were requestedt), in which case it didn't mark the node idle and did not send further requests, since it signals a past timeout. The current code did mark it idle even on stale deliveries, which eventually caused two requests to be in flight at the same time, making the deliveries always stale and mass duplicating retrievals between multiple peers. * eth/downloader: fix state request leak This commit fixes the hang seen sometimes while doing the state sync. The cause of the hang was a rare combination of events: request state data from peer, peer drops and reconnects almost immediately. This caused a new download task to be assigned to the peer, overwriting the old one still waiting for a timeout, which in turned leaked the requests out, never to be retried. The fix is to ensure that a task assignment moves any pending one back into the retry queue. The commit also fixes a regression with peer dropping due to stalls. The current code considered a peer stalling if they timed out delivering 1 item. However, the downloader never requests only one, the minimum is 2 (attempt to fine tune estimated latency/bandwidth). The fix is simply to drop if a timeout is detected at 2 items. Apart from the above bugfixes, the commit contains some code polishes I made while debugging the hang. * core, eth, trie: support batched trie sync db writes * trie: rename SyncMemCache to syncMemBatch --- core/state/sync.go | 12 +- core/state/sync_test.go | 25 +- eth/downloader/downloader.go | 336 ++++++++++++++------------ eth/downloader/metrics.go | 6 +- eth/downloader/peer.go | 25 +- eth/downloader/queue.go | 293 +---------------------- eth/downloader/statesync.go | 449 +++++++++++++++++++++++++++++++++++ trie/sync.go | 74 +++++- trie/sync_test.go | 30 ++- 9 files changed, 770 insertions(+), 480 deletions(-) create mode 100644 eth/downloader/statesync.go diff --git a/core/state/sync.go b/core/state/sync.go index 8456a810b7..2c29d706aa 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -59,10 +59,16 @@ func (s *StateSync) Missing(max int) []common.Hash { } // Process injects a batch of retrieved trie nodes data, returning if something -// was committed to the database and also the index of an entry if processing of +// was committed to the memcache and also the index of an entry if processing of // it failed. -func (s *StateSync) Process(list []trie.SyncResult, dbw trie.DatabaseWriter) (bool, int, error) { - return (*trie.TrieSync)(s).Process(list, dbw) +func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) { + return (*trie.TrieSync)(s).Process(list) +} + +// Commit flushes the data stored in the internal memcache out to persistent +// storage, returning th enumber of items written and any occurred error. +func (s *StateSync) Commit(dbw trie.DatabaseWriter) (int, error) { + return (*trie.TrieSync)(s).Commit(dbw) } // Pending returns the number of state entries currently pending for download. diff --git a/core/state/sync_test.go b/core/state/sync_test.go index 43d146e3aa..108ebb3208 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -138,9 +138,12 @@ func testIterativeStateSync(t *testing.T, batch int) { } results[i] = trie.SyncResult{Hash: hash, Data: data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(batch)...) } // Cross check that the two states are in sync @@ -168,9 +171,12 @@ func TestIterativeDelayedStateSync(t *testing.T) { } results[i] = trie.SyncResult{Hash: hash, Data: data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[len(results):], sched.Missing(0)...) } // Cross check that the two states are in sync @@ -206,9 +212,12 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { results = append(results, trie.SyncResult{Hash: hash, Data: data}) } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { queue[hash] = struct{}{} @@ -249,9 +258,12 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, hash := range sched.Missing(0) { queue[hash] = struct{}{} } @@ -283,9 +295,12 @@ func TestIncompleteStateSync(t *testing.T) { results[i] = trie.SyncResult{Hash: hash, Data: data} } // Process each of the state nodes - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { added = append(added, result.Hash) } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 839969f03a..e4d1392d0a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -34,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" ) @@ -99,8 +98,9 @@ type Downloader struct { mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) mux *event.TypeMux // Event multiplexer to announce sync operation events - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + stateDB ethdb.Database fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section @@ -109,9 +109,9 @@ type Downloader struct { rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) // Statistics - syncStatsChainOrigin uint64 // Origin block number where syncing started at - syncStatsChainHeight uint64 // Highest block number known when syncing started - syncStatsStateDone uint64 // Number of state trie entries already pulled + syncStatsChainOrigin uint64 // Origin block number where syncing started at + syncStatsChainHeight uint64 // Highest block number known when syncing started + syncStatsState stateSyncStats syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks @@ -136,16 +136,18 @@ type Downloader struct { notified int32 // Channels - newPeerCh chan *peer headerCh chan dataPack // [eth/62] Channel receiving inbound block headers bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - stateCh chan dataPack // [eth/63] Channel receiving inbound node state data bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + // for stateFetcher + stateSyncStart chan *stateSync + trackStateReq chan *stateReq + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + // Cancellation and termination cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) cancelCh chan struct{} // Channel to cancel mid-flight syncs @@ -170,8 +172,9 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he dl := &Downloader{ mode: mode, mux: mux, - queue: newQueue(stateDb), + queue: newQueue(), peers: newPeerSet(), + stateDB: stateDb, rttEstimate: uint64(rttMaxEstimate), rttConfidence: uint64(1000000), hasHeader: hasHeader, @@ -188,18 +191,20 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he insertReceipts: insertReceipts, rollback: rollback, dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), headerCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1), receiptCh: make(chan dataPack, 1), - stateCh: make(chan dataPack, 1), bodyWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1), - stateWakeCh: make(chan bool, 1), headerProcCh: make(chan []*types.Header, 1), quitCh: make(chan struct{}), + // for stateFetcher + stateSyncStart: make(chan *stateSync), + trackStateReq: make(chan *stateReq), + stateCh: make(chan dataPack), } go dl.qosTuner() + go dl.stateFetcher() return dl } @@ -211,9 +216,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he // of processed and the total number of known states are also returned. Otherwise // these are zero. func (d *Downloader) Progress() ethereum.SyncProgress { - // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks - pendingStates := uint64(d.queue.PendingNodeData()) - // Lock the current stats and return the progress d.syncStatsLock.RLock() defer d.syncStatsLock.RUnlock() @@ -231,8 +233,8 @@ func (d *Downloader) Progress() ethereum.SyncProgress { StartingBlock: d.syncStatsChainOrigin, CurrentBlock: current, HighestBlock: d.syncStatsChainHeight, - PulledStates: d.syncStatsStateDone, - KnownStates: d.syncStatsStateDone + pendingStates, + PulledStates: d.syncStatsState.processed, + KnownStates: d.syncStatsState.processed + d.syncStatsState.pending, } } @@ -324,13 +326,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode d.queue.Reset() d.peers.Reset() - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case <-ch: default: } } - for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} { + for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { for empty := false; !empty; { select { case <-ch: @@ -439,30 +441,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, height) } - return d.spawnSync(origin+1, - func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved - func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync - ) + + fetchers := []func() error{ + func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.processHeaders(origin+1, td) }, + } + if d.mode == FastSync { + fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) + } else if d.mode == FullSync { + fetchers = append(fetchers, d.processFullSyncContent) + } + err = d.spawnSync(fetchers) + if err != nil && d.mode == FastSync && d.fsPivotLock != nil { + // If sync failed in the critical section, bump the fail counter. + atomic.AddUint32(&d.fsPivotFails, 1) + } + return err } // spawnSync runs d.process and all given fetcher functions to completion in // separate goroutines, returning the first error that appears. -func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { +func (d *Downloader) spawnSync(fetchers []func() error) error { var wg sync.WaitGroup - errc := make(chan error, len(fetchers)+1) - wg.Add(len(fetchers) + 1) - go func() { defer wg.Done(); errc <- d.processContent() }() + errc := make(chan error, len(fetchers)) + wg.Add(len(fetchers)) for _, fn := range fetchers { fn := fn go func() { defer wg.Done(); errc <- fn() }() } // Wait for the first error, then terminate the others. var err error - for i := 0; i < len(fetchers)+1; i++ { - if i == len(fetchers) { + for i := 0; i < len(fetchers); i++ { + if i == len(fetchers)-1 { // Close the queue when all fetchers have exited. // This will cause the block processor to end when // it has processed the queue. @@ -475,11 +487,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { d.queue.Close() d.Cancel() wg.Wait() - - // If sync failed in the critical section, bump the fail counter - if err != nil && d.mode == FastSync && d.fsPivotLock != nil { - atomic.AddUint32(&d.fsPivotFails, 1) - } return err } @@ -552,7 +559,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { return nil, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -649,7 +655,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -714,7 +719,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -827,7 +831,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -927,68 +931,6 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } -// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any -// available peers, reserving a chunk of nodes for each, waiting for delivery and -// also periodically checking for timeouts. -func (d *Downloader) fetchNodeData() error { - log.Debug("Downloading node state data") - - var ( - deliver = func(packet dataPack) (int, error) { - start := time.Now() - return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) { - // If the peer returned old-requested data, forgive - if err == trie.ErrNotRequested { - log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId()) - return - } - if err != nil { - // If the node data processing failed, the root hash is very wrong, abort - log.Error("State processing failed", "peer", packet.PeerId(), "err", err) - d.Cancel() - return - } - // Processing succeeded, notify state fetcher of continuation - pending := d.queue.PendingNodeData() - if pending > 0 { - select { - case d.stateWakeCh <- true: - default: - } - } - d.syncStatsLock.Lock() - d.syncStatsStateDone += uint64(delivered) - syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below - d.syncStatsLock.Unlock() - - // If real database progress was made, reset any fast-sync pivot failure - if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 { - log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails)) - atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block - } - // Log a message to the user and return - if delivered > 0 { - log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending) - } - }) - } - expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) } - throttle = func() bool { return false } - reserve = func(p *peer, count int) (*fetchRequest, bool, error) { - return d.queue.ReserveNodeData(p, count), false, nil - } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } - capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } - ) - err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, - d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, - d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states") - - log.Debug("Node state data download terminated", "err", err) - return err -} - // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. @@ -1229,7 +1171,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Terminate header processing if we synced up if len(headers) == 0 { // Notify everyone that headers are fully processed - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1341,7 +1283,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { origin += uint64(limit) } // Signal the content downloaders of the availablility of new tasks - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: @@ -1351,73 +1293,153 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { } } -// processContent takes fetch results from the queue and tries to import them -// into the chain. The type of import operation will depend on the result contents. -func (d *Downloader) processContent() error { - pivot := d.queue.FastSyncPivot() +// processFullSyncContent takes fetch results from the queue and imports them into the chain. +func (d *Downloader) processFullSyncContent() error { for { results := d.queue.WaitResults() if len(results) == 0 { - return nil // queue empty + return nil } if d.chainInsertHook != nil { d.chainInsertHook(results) } - // Actually import the blocks - first, last := results[0].Header, results[len(results)-1].Header + if err := d.importBlockResults(results); err != nil { + return err + } + } +} + +func (d *Downloader) importBlockResults(results []*fetchResult) error { + for len(results) != 0 { + // Check for any termination requests. This makes clean shutdown faster. + select { + case <-d.quitCh: + return errCancelContentProcessing + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header log.Debug("Inserting downloaded chain", "items", len(results), "firstnum", first.Number, "firsthash", first.Hash(), "lastnum", last.Number, "lasthash", last.Hash(), ) - for len(results) != 0 { - // Check for any termination requests - select { - case <-d.quitCh: - return errCancelContentProcessing - default: + blocks := make([]*types.Block, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + } + if index, err := d.insertBlocks(blocks); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain + } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +// processFastSyncContent takes fetch results from the queue and writes them to the +// database. It also controls the synchronisation of state nodes of the pivot block. +func (d *Downloader) processFastSyncContent(latest *types.Header) error { + // Start syncing state of the reported head block. + // This should get us most of the state of the pivot block. + stateSync := d.syncState(latest.Root) + defer stateSync.Cancel() + go func() { + if err := stateSync.Wait(); err != nil { + d.queue.Close() // wake up WaitResults + } + }() + + pivot := d.queue.FastSyncPivot() + for { + results := d.queue.WaitResults() + if len(results) == 0 { + return stateSync.Cancel() + } + if d.chainInsertHook != nil { + d.chainInsertHook(results) + } + P, beforeP, afterP := splitAroundPivot(pivot, results) + if err := d.commitFastSyncData(beforeP, stateSync); err != nil { + return err + } + if P != nil { + stateSync.Cancel() + if err := d.commitPivotBlock(P); err != nil { + return err } - // Retrieve the a batch of results to import - var ( - blocks = make([]*types.Block, 0, maxResultsProcess) - receipts = make([]types.Receipts, 0, maxResultsProcess) - ) - items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) - for _, result := range results[:items] { - switch { - case d.mode == FullSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - case d.mode == FastSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - if result.Header.Number.Uint64() <= pivot { - receipts = append(receipts, result.Receipts) - } - } - } - // Try to process the results, aborting if there's an error - var ( - err error - index int - ) - switch { - case len(receipts) > 0: - index, err = d.insertReceipts(blocks, receipts) - if err == nil && blocks[len(blocks)-1].NumberU64() == pivot { - log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash()) - index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash()) - } - default: - index, err = d.insertBlocks(blocks) - } - if err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain - } - // Shift the results to the next batch - results = results[items:] + } + if err := d.importBlockResults(afterP); err != nil { + return err } } } +func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) { + for _, result := range results { + num := result.Header.Number.Uint64() + switch { + case num < pivot: + before = append(before, result) + case num == pivot: + p = result + default: + after = append(after, result) + } + } + return p, before, after +} + +func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error { + for len(results) != 0 { + // Check for any termination requests. + select { + case <-d.quitCh: + return errCancelContentProcessing + case <-stateSync.done: + if err := stateSync.Wait(); err != nil { + return err + } + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header + log.Debug("Inserting fast-sync blocks", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnumn", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, items) + receipts := make([]types.Receipts, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + receipts[i] = result.Receipts + } + if index, err := d.insertReceipts(blocks, receipts); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain + } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +func (d *Downloader) commitPivotBlock(result *fetchResult) error { + b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + // Sync the pivot block state. This should complete reasonably quickly because + // we've already synced up to the reported head block state earlier. + if err := d.syncState(b.Root()).Wait(); err != nil { + return err + } + log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash()) + if _, err := d.insertReceipts([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil { + return err + } + return d.commitHeadBlock(b.Hash()) +} + // DeliverHeaders injects a new batch of block headers received from a remote // node into the download schedule. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index 0d76c7dfd2..58764ccf06 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -38,8 +38,6 @@ var ( receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop") receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout") - stateInMeter = metrics.NewMeter("eth/downloader/states/in") - stateReqTimer = metrics.NewTimer("eth/downloader/states/req") - stateDropMeter = metrics.NewMeter("eth/downloader/states/drop") - stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout") + stateInMeter = metrics.NewMeter("eth/downloader/states/in") + stateDropMeter = metrics.NewMeter("eth/downloader/states/drop") ) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 15a912f1f2..dc8b097728 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -30,6 +30,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" ) @@ -195,7 +196,7 @@ func (p *peer) FetchReceipts(request *fetchRequest) error { } // FetchNodeData sends a node state data retrieval request to the remote peer. -func (p *peer) FetchNodeData(request *fetchRequest) error { +func (p *peer) FetchNodeData(hashes []common.Hash) error { // Sanity check the protocol version if p.version < 63 { panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version)) @@ -205,14 +206,7 @@ func (p *peer) FetchNodeData(request *fetchRequest) error { return errAlreadyFetching } p.stateStarted = time.Now() - - // Convert the hash set to a retrievable slice - hashes := make([]common.Hash, 0, len(request.Hashes)) - for hash := range request.Hashes { - hashes = append(hashes, hash) - } go p.getNodeData(hashes) - return nil } @@ -343,8 +337,9 @@ func (p *peer) Lacks(hash common.Hash) bool { // peerSet represents the collection of active peer participating in the chain // download procedure. type peerSet struct { - peers map[string]*peer - lock sync.RWMutex + peers map[string]*peer + newPeerFeed event.Feed + lock sync.RWMutex } // newPeerSet creates a new peer set top track the active download sources. @@ -354,6 +349,10 @@ func newPeerSet() *peerSet { } } +func (ps *peerSet) SubscribeNewPeers(ch chan<- *peer) event.Subscription { + return ps.newPeerFeed.Subscribe(ch) +} + // Reset iterates over the current peer set, and resets each of the known peers // to prepare for a next batch of block retrieval. func (ps *peerSet) Reset() { @@ -377,9 +376,8 @@ func (ps *peerSet) Register(p *peer) error { // Register the new peer with some meaningful defaults ps.lock.Lock() - defer ps.lock.Unlock() - if _, ok := ps.peers[p.id]; ok { + ps.lock.Unlock() return errAlreadyRegistered } if len(ps.peers) > 0 { @@ -399,6 +397,9 @@ func (ps *peerSet) Register(p *peer) error { p.stateThroughput /= float64(len(ps.peers)) } ps.peers[p.id] = p + ps.lock.Unlock() + + ps.newPeerFeed.Send(p) return nil } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 855097c45a..8a7735d673 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -26,20 +26,13 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) -var ( - blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download - maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently -) +var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download var ( errNoFetchesPending = errors.New("no fetches pending") @@ -94,15 +87,6 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches - stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritised retrieval order - stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority - stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for - statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations - - stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly - stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator - stateWriters int // [eth/63] Number of running state DB writer goroutines - resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain @@ -112,7 +96,7 @@ type queue struct { } // newQueue creates a new download queue for scheduling block retrieval. -func newQueue(stateDb ethdb.Database) *queue { +func newQueue() *queue { lock := new(sync.Mutex) return &queue{ headerPendPool: make(map[string]*fetchRequest), @@ -125,10 +109,6 @@ func newQueue(stateDb ethdb.Database) *queue { receiptTaskQueue: prque.New(), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), - stateTaskPool: make(map[common.Hash]int), - stateTaskQueue: prque.New(), - statePendPool: make(map[string]*fetchRequest), - stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), active: sync.NewCond(lock), lock: lock, @@ -158,12 +138,6 @@ func (q *queue) Reset() { q.receiptPendPool = make(map[string]*fetchRequest) q.receiptDonePool = make(map[common.Hash]struct{}) - q.stateTaskIndex = 0 - q.stateTaskPool = make(map[common.Hash]int) - q.stateTaskQueue.Reset() - q.statePendPool = make(map[string]*fetchRequest) - q.stateScheduler = nil - q.resultCache = make([]*fetchResult, blockCacheLimit) q.resultOffset = 0 } @@ -201,28 +175,6 @@ func (q *queue) PendingReceipts() int { return q.receiptTaskQueue.Size() } -// PendingNodeData retrieves the number of node data entries pending for retrieval. -func (q *queue) PendingNodeData() int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.pendingNodeDataLocked() -} - -// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval. -// The caller must hold q.lock. -func (q *queue) pendingNodeDataLocked() int { - var n int - if q.stateScheduler != nil { - n = q.stateScheduler.Pending() - } - // Ensure that PendingNodeData doesn't return 0 until all state is written. - if q.stateWriters > 0 { - n++ - } - return n -} - // InFlightHeaders retrieves whether there are header fetch requests currently // in flight. func (q *queue) InFlightHeaders() bool { @@ -250,28 +202,15 @@ func (q *queue) InFlightReceipts() bool { return len(q.receiptPendPool) > 0 } -// InFlightNodeData retrieves whether there are node data entry fetch requests -// currently in flight. -func (q *queue) InFlightNodeData() bool { - q.lock.Lock() - defer q.lock.Unlock() - - return len(q.statePendPool)+q.stateWriters > 0 -} - -// Idle returns if the queue is fully idle or has some data still inside. This -// method is used by the tester to detect termination events. +// Idle returns if the queue is fully idle or has some data still inside. func (q *queue) Idle() bool { q.lock.Lock() defer q.lock.Unlock() - queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() - pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) + queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) cached := len(q.blockDonePool) + len(q.receiptDonePool) - if q.stateScheduler != nil { - queued += q.stateScheduler.Pending() - } return (queued + pending + cached) == 0 } @@ -389,19 +328,6 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } - if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { - // Pivoting point of the fast sync, switch the state retrieval to this - log.Debug("Switching state downloads to new block", "number", header.Number, "hash", hash) - - q.stateTaskIndex = 0 - q.stateTaskPool = make(map[common.Hash]int) - q.stateTaskQueue.Reset() - for _, req := range q.statePendPool { - req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear - } - - q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) - } inserts = append(inserts, header) q.headerHead = hash from++ @@ -448,31 +374,15 @@ func (q *queue) countProcessableItems() int { if result == nil || result.Pending > 0 { return i } - // Special handling for the fast-sync pivot block: - if q.mode == FastSync { - bnum := result.Header.Number.Uint64() - if bnum == q.fastSyncPivot { - // If the state of the pivot block is not - // available yet, we cannot proceed and return 0. - // - // Stop before processing the pivot block to ensure that - // resultCache has space for fsHeaderForceVerify items. Not - // doing this could leave us unable to download the required - // amount of headers. - if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 { + // Stop before processing the pivot block to ensure that + // resultCache has space for fsHeaderForceVerify items. Not + // doing this could leave us unable to download the required + // amount of headers. + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { return i } - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - return i - } - } - } - // If we're just the fast sync pivot, stop as well - // because the following batch needs different insertion. - // This simplifies handling the switchover in d.process. - if bnum == q.fastSyncPivot+1 && i > 0 { - return i } } } @@ -519,81 +429,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { return request } -// ReserveNodeData reserves a set of node data hashes for the given peer, skipping -// any previously failed download. -func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { - // Create a task generator to fetch status-fetch tasks if all schedules ones are done - generator := func(max int) { - if q.stateScheduler != nil { - for _, hash := range q.stateScheduler.Missing(max) { - q.stateTaskPool[hash] = q.stateTaskIndex - q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) - q.stateTaskIndex++ - } - } - } - q.lock.Lock() - defer q.lock.Unlock() - - return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, maxInFlightStates) -} - -// reserveHashes reserves a set of hashes for the given peer, skipping previously -// failed ones. -// -// Note, this method expects the queue lock to be already held for writing. The -// reason the lock is not obtained in here is because the parameters already need -// to access the queue, so they already need a lock anyway. -func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { - // Short circuit if the peer's already downloading something (sanity check to - // not corrupt state) - if _, ok := pendPool[p.id]; ok { - return nil - } - // Calculate an upper limit on the hashes we might fetch (i.e. throttling) - allowance := maxPending - if allowance > 0 { - for _, request := range pendPool { - allowance -= len(request.Hashes) - } - } - // If there's a task generator, ask it to fill our task queue - if taskGen != nil && taskQueue.Size() < allowance { - taskGen(allowance - taskQueue.Size()) - } - if taskQueue.Empty() { - return nil - } - // Retrieve a batch of hashes, skipping previously failed ones - send := make(map[common.Hash]int) - skip := make(map[common.Hash]int) - - for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { - hash, priority := taskQueue.Pop() - if p.Lacks(hash.(common.Hash)) { - skip[hash.(common.Hash)] = int(priority) - } else { - send[hash.(common.Hash)] = int(priority) - } - } - // Merge all the skipped hashes back - for hash, index := range skip { - taskQueue.Push(hash, float32(index)) - } - // Assemble and return the block download request - if len(send) == 0 { - return nil - } - request := &fetchRequest{ - Peer: p, - Hashes: send, - Time: time.Now(), - } - pendPool[p.id] = request - - return request -} - // ReserveBodies reserves a set of body fetches for the given peer, skipping any // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. @@ -722,12 +557,6 @@ func (q *queue) CancelReceipts(request *fetchRequest) { q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) } -// CancelNodeData aborts a node state data fetch request, returning all pending -// hashes to the task queue. -func (q *queue) CancelNodeData(request *fetchRequest) { - q.cancel(request, q.stateTaskQueue, q.statePendPool) -} - // Cancel aborts a fetch request, returning all pending hashes to the task queue. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { q.lock.Lock() @@ -764,12 +593,6 @@ func (q *queue) Revoke(peerId string) { } delete(q.receiptPendPool, peerId) } - if request, ok := q.statePendPool[peerId]; ok { - for hash, index := range request.Hashes { - q.stateTaskQueue.Push(hash, float32(index)) - } - delete(q.statePendPool, peerId) - } } // ExpireHeaders checks for in flight requests that exceeded a timeout allowance, @@ -799,15 +622,6 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter) } -// ExpireNodeData checks for in flight node data requests that exceeded a timeout -// allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter) -} - // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. // @@ -1044,84 +858,6 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } -// DeliverNodeData injects a node state data retrieval response into the queue. -// The method returns the number of node state accepted from the delivery. -func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) { - q.lock.Lock() - defer q.lock.Unlock() - - // Short circuit if the data was never requested - request := q.statePendPool[id] - if request == nil { - return 0, errNoFetchesPending - } - stateReqTimer.UpdateSince(request.Time) - delete(q.statePendPool, id) - - // If no data was retrieved, mark their hashes as unavailable for the origin peer - if len(data) == 0 { - for hash := range request.Hashes { - request.Peer.MarkLacking(hash) - } - } - // Iterate over the downloaded data and verify each of them - errs := make([]error, 0) - process := []trie.SyncResult{} - for _, blob := range data { - // Skip any state trie entries that were not requested - hash := common.BytesToHash(crypto.Keccak256(blob)) - if _, ok := request.Hashes[hash]; !ok { - errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) - continue - } - // Inject the next state trie item into the processing queue - process = append(process, trie.SyncResult{Hash: hash, Data: blob}) - delete(request.Hashes, hash) - delete(q.stateTaskPool, hash) - } - // Return all failed or missing fetches to the queue - for hash, index := range request.Hashes { - q.stateTaskQueue.Push(hash, float32(index)) - } - if q.stateScheduler == nil { - return 0, errNoFetchesPending - } - - // Run valid nodes through the trie download scheduler. It writes completed nodes to a - // batch, which is committed asynchronously. This may lead to over-fetches because the - // scheduler treats everything as written after Process has returned, but it's - // unlikely to be an issue in practice. - batch := q.stateDatabase.NewBatch() - progressed, nproc, procerr := q.stateScheduler.Process(process, batch) - q.stateWriters += 1 - go func() { - if procerr == nil { - nproc = len(process) - procerr = batch.Write() - } - // Return processing errors through the callback so the sync gets canceled. The - // number of writers is decremented prior to the call so PendingNodeData will - // return zero when the callback runs. - q.lock.Lock() - q.stateWriters -= 1 - q.lock.Unlock() - callback(nproc, progressed, procerr) - // Wake up WaitResults after the state has been written because it might be - // waiting for completion of the pivot block's state download. - q.active.Signal() - }() - - // If none of the data items were good, it's a stale delivery - switch { - case len(errs) == 0: - return len(process), nil - case len(errs) == len(request.Hashes): - return len(process), errStaleDelivery - default: - return len(process), fmt.Errorf("multiple failures: %v", errs) - } -} - // Prepare configures the result cache to allow accepting and caching inbound // fetch results. func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) { @@ -1134,9 +870,4 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types. } q.fastSyncPivot = pivot q.mode = mode - - // If long running fast sync, also start up a head stateretrieval immediately - if mode == FastSync && pivot > 0 { - q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase) - } } diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go new file mode 100644 index 0000000000..4e66120393 --- /dev/null +++ b/eth/downloader/statesync.go @@ -0,0 +1,449 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "fmt" + "hash" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie" +) + +// stateReq represents a batch of state fetch requests groupped together into +// a single data retrieval network packet. +type stateReq struct { + items []common.Hash // Hashes of the state items to download + tasks map[common.Hash]*stateTask // Download tasks to track previous attempts + timeout time.Duration // Maximum round trip time for this to complete + timer *time.Timer // Timer to fire when the RTT timeout expires + peer *peer // Peer that we're requesting from + response [][]byte // Response data of the peer (nil for timeouts) +} + +// timedOut returns if this request timed out. +func (req *stateReq) timedOut() bool { + return req.response == nil +} + +// stateSyncStats is a collection of progress stats to report during a state trie +// sync to RPC requests as well as to display in user logs. +type stateSyncStats struct { + processed uint64 // Number of state entries processed + duplicate uint64 // Number of state entries downloaded twice + unexpected uint64 // Number of non-requested state entries received + pending uint64 // Number of still pending state entries +} + +// syncState starts downloading state with the given root hash. +func (d *Downloader) syncState(root common.Hash) *stateSync { + s := newStateSync(d, root) + select { + case d.stateSyncStart <- s: + case <-d.quitCh: + s.err = errCancelStateFetch + close(s.done) + } + return s +} + +// stateFetcher manages the active state sync and accepts requests +// on its behalf. +func (d *Downloader) stateFetcher() { + for { + select { + case s := <-d.stateSyncStart: + for next := s; next != nil; { + next = d.runStateSync(next) + } + case <-d.stateCh: + // Ignore state responses while no sync is running. + case <-d.quitCh: + return + } + } +} + +// runStateSync runs a state synchronisation until it completes or another root +// hash is requested to be switched over to. +func (d *Downloader) runStateSync(s *stateSync) *stateSync { + var ( + active = make(map[string]*stateReq) // Currently in-flight requests + finished []*stateReq // Completed or failed requests + timeout = make(chan *stateReq) // Timed out active requests + ) + defer func() { + // Cancel active request timers on exit. Also set peers to idle so they're + // available for the next sync. + for _, req := range active { + req.timer.Stop() + req.peer.SetNodeDataIdle(len(req.items)) + } + }() + // Run the state sync. + go s.run() + defer s.Cancel() + + for { + // Enable sending of the first buffered element if there is one. + var ( + deliverReq *stateReq + deliverReqCh chan *stateReq + ) + if len(finished) > 0 { + deliverReq = finished[0] + deliverReqCh = s.deliver + } + + select { + // The stateSync lifecycle: + case next := <-d.stateSyncStart: + return next + + case <-s.done: + return nil + + // Send the next finished request to the current sync: + case deliverReqCh <- deliverReq: + finished = append(finished[:0], finished[1:]...) + + // Handle incoming state packs: + case pack := <-d.stateCh: + // Discard any data not requested (or previsouly timed out) + req := active[pack.PeerId()] + if req == nil { + log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items()) + continue + } + // Finalize the request and queue up for processing + req.timer.Stop() + req.response = pack.(*statePack).states + + finished = append(finished, req) + delete(active, pack.PeerId()) + + // Handle timed-out requests: + case req := <-timeout: + // If the peer is already requesting something else, ignore the stale timeout. + // This can happen when the timeout and the delivery happens simultaneously, + // causing both pathways to trigger. + if active[req.peer.id] != req { + continue + } + // Move the timed out data back into the download queue + finished = append(finished, req) + delete(active, req.peer.id) + + // Track outgoing state requests: + case req := <-d.trackStateReq: + // If an active request already exists for this peer, we have a problem. In + // theory the trie node schedule must never assign two requests to the same + // peer. In practive however, a peer might receive a request, disconnect and + // immediately reconnect before the previous times out. In this case the first + // request is never honored, alas we must not silently overwrite it, as that + // causes valid requests to go missing and sync to get stuck. + if old := active[req.peer.id]; old != nil { + log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) + + // Make sure the previous one doesn't get siletly lost + finished = append(finished, old) + } + // Start a timer to notify the sync loop if the peer stalled. + req.timer = time.AfterFunc(req.timeout, func() { + select { + case timeout <- req: + case <-s.done: + // Prevent leaking of timer goroutines in the unlikely case where a + // timer is fired just before exiting runStateSync. + } + }) + active[req.peer.id] = req + } + } +} + +// stateSync schedules requests for downloading a particular state trie defined +// by a given state root. +type stateSync struct { + d *Downloader // Downloader instance to access and manage current peerset + + sched *state.StateSync // State trie sync scheduler defining the tasks + keccak hash.Hash // Keccak256 hasher to verify deliveries with + tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval + + deliver chan *stateReq // Delivery channel multiplexing peer responses + cancel chan struct{} // Channel to signal a termination request + cancelOnce sync.Once // Ensures cancel only ever gets called once + done chan struct{} // Channel to signal termination completion + err error // Any error hit during sync (set before completion) +} + +// stateTask represents a single trie node download taks, containing a set of +// peers already attempted retrieval from to detect stalled syncs and abort. +type stateTask struct { + attempts map[string]struct{} +} + +// newStateSync creates a new state trie download scheduler. This method does not +// yet start the sync. The user needs to call run to initiate. +func newStateSync(d *Downloader, root common.Hash) *stateSync { + return &stateSync{ + d: d, + sched: state.NewStateSync(root, d.stateDB), + keccak: sha3.NewKeccak256(), + tasks: make(map[common.Hash]*stateTask), + deliver: make(chan *stateReq), + cancel: make(chan struct{}), + done: make(chan struct{}), + } +} + +// run starts the task assignment and response processing loop, blocking until +// it finishes, and finally notifying any goroutines waiting for the loop to +// finish. +func (s *stateSync) run() { + s.err = s.loop() + close(s.done) +} + +// Wait blocks until the sync is done or canceled. +func (s *stateSync) Wait() error { + <-s.done + return s.err +} + +// Cancel cancels the sync and waits until it has shut down. +func (s *stateSync) Cancel() error { + s.cancelOnce.Do(func() { close(s.cancel) }) + return s.Wait() +} + +// loop is the main event loop of a state trie sync. It it responsible for the +// assignment of new tasks to peers (including sending it to them) as well as +// for the processing of inbound data. Note, that the loop does not directly +// receive data from peers, rather those are buffered up in the downloader and +// pushed here async. The reason is to decouple processing from data receipt +// and timeouts. +func (s *stateSync) loop() error { + // Listen for new peer events to assign tasks to them + newPeer := make(chan *peer, 1024) + peerSub := s.d.peers.SubscribeNewPeers(newPeer) + defer peerSub.Unsubscribe() + + // Keep assigning new tasks until the sync completes or aborts + for s.sched.Pending() > 0 { + if err := s.assignTasks(); err != nil { + return err + } + // Tasks assigned, wait for something to happen + select { + case <-newPeer: + // New peer arrived, try to assign it download tasks + + case <-s.cancel: + return errCancelStateFetch + + case req := <-s.deliver: + // Response or timeout triggered, drop the peer if stalling + log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "timeout", req.timedOut()) + if len(req.items) <= 2 && req.timedOut() { + // 2 items are the minimum requested, if even that times out, we've no use of + // this peer at the moment. + log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) + s.d.dropPeer(req.peer.id) + } + // Process all the received blobs and check for stale delivery + stale, err := s.process(req) + if err != nil { + log.Warn("Node data write error", "err", err) + return err + } + // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery) + if !stale { + req.peer.SetNodeDataIdle(len(req.response)) + } + } + } + return nil +} + +// assignTasks attempts to assing new tasks to all idle peers, either from the +// batch currently being retried, or fetching new data from the trie sync itself. +func (s *stateSync) assignTasks() error { + // Iterate over all idle peers and try to assign them state fetches + peers, _ := s.d.peers.NodeDataIdlePeers() + for _, p := range peers { + // Assign a batch of fetches proportional to the estimated latency/bandwidth + cap := p.NodeDataCapacity(s.d.requestRTT()) + req := &stateReq{peer: p, timeout: s.d.requestTTL()} + s.fillTasks(cap, req) + + // If the peer was assigned tasks to fetch, send the network request + if len(req.items) > 0 { + req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items)) + + select { + case s.d.trackStateReq <- req: + req.peer.FetchNodeData(req.items) + case <-s.cancel: + } + } + } + return nil +} + +// fillTasks fills the given request object with a maximum of n state download +// tasks to send to the remote peer. +func (s *stateSync) fillTasks(n int, req *stateReq) { + // Refill available tasks from the scheduler. + if len(s.tasks) < n { + new := s.sched.Missing(n - len(s.tasks)) + for _, hash := range new { + s.tasks[hash] = &stateTask{make(map[string]struct{})} + } + } + // Find tasks that haven't been tried with the request's peer. + req.items = make([]common.Hash, 0, n) + req.tasks = make(map[common.Hash]*stateTask, n) + for hash, t := range s.tasks { + // Stop when we've gathered enough requests + if len(req.items) == n { + break + } + // Skip any requests we've already tried from this peer + if _, ok := t.attempts[req.peer.id]; ok { + continue + } + // Assign the request to this peer + t.attempts[req.peer.id] = struct{}{} + req.items = append(req.items, hash) + req.tasks[hash] = t + delete(s.tasks, hash) + } +} + +// process iterates over a batch of delivered state data, injecting each item +// into a running state sync, re-queuing any items that were requested but not +// delivered. +func (s *stateSync) process(req *stateReq) (bool, error) { + // Collect processing stats and update progress if valid data was received + processed, written, duplicate, unexpected := 0, 0, 0, 0 + + defer func(start time.Time) { + if processed+written+duplicate+unexpected > 0 { + s.updateStats(processed, written, duplicate, unexpected, time.Since(start)) + } + }(time.Now()) + + // Iterate over all the delivered data and inject one-by-one into the trie + progress, stale := false, len(req.response) > 0 + + for _, blob := range req.response { + prog, hash, err := s.processNodeData(blob) + switch err { + case nil: + processed++ + case trie.ErrNotRequested: + unexpected++ + case trie.ErrAlreadyProcessed: + duplicate++ + default: + return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) + } + if prog { + progress = true + } + // If the node delivered a requested item, mark the delivery non-stale + if _, ok := req.tasks[hash]; ok { + delete(req.tasks, hash) + stale = false + } + } + // If some data managed to hit the database, flush and reset failure counters + if progress { + // Flush any accumulated data out to disk + batch := s.d.stateDB.NewBatch() + + count, err := s.sched.Commit(batch) + if err != nil { + return stale, err + } + if err := batch.Write(); err != nil { + return stale, err + } + written = count + + // If we're inside the critical section, reset fail counter since we progressed + if atomic.LoadUint32(&s.d.fsPivotFails) > 1 { + log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails)) + atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block + } + } + // Put unfulfilled tasks back into the retry queue + npeers := s.d.peers.Len() + + for hash, task := range req.tasks { + // If the node did deliver something, missing items may be due to a protocol + // limit or a previous timeout + delayed delivery. Both cases should permit + // the node to retry the missing items (to avoid single-peer stalls). + if len(req.response) > 0 || req.timedOut() { + delete(task.attempts, req.peer.id) + } + // If we've requested the node too many times already, it may be a malicious + // sync where nobody has the right data. Abort. + if len(task.attempts) >= npeers { + return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) + } + // Missing item, place into the retry queue. + s.tasks[hash] = task + } + return stale, nil +} + +// processNodeData tries to inject a trie node data blob delivered from a remote +// peer into the state trie, returning whether anything useful was written or any +// error occurred. +func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) { + res := trie.SyncResult{Data: blob} + + s.keccak.Reset() + s.keccak.Write(blob) + s.keccak.Sum(res.Hash[:0]) + + committed, _, err := s.sched.Process([]trie.SyncResult{res}) + return committed, res.Hash, err +} + +// updateStats bumps the various state sync progress counters and displays a log +// message for the user to see. +func (s *stateSync) updateStats(processed, written, duplicate, unexpected int, duration time.Duration) { + s.d.syncStatsLock.Lock() + defer s.d.syncStatsLock.Unlock() + + s.d.syncStatsState.pending = uint64(s.sched.Pending()) + s.d.syncStatsState.processed += uint64(processed) + s.d.syncStatsState.duplicate += uint64(duplicate) + s.d.syncStatsState.unexpected += uint64(unexpected) + + log.Info("Imported new state entries", "count", processed, "flushed", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected) +} diff --git a/trie/sync.go b/trie/sync.go index 168501392c..9e8449431d 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -28,6 +28,10 @@ import ( // node it did not request. var ErrNotRequested = errors.New("not requested") +// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a +// node it already processed previously. +var ErrAlreadyProcessed = errors.New("already processed") + // request represents a scheduled or already in-flight state retrieval request. type request struct { hash common.Hash // Hash of the node data content to retrieve @@ -48,6 +52,21 @@ type SyncResult struct { Data []byte // Data content of the retrieved node } +// syncMemBatch is an in-memory buffer of successfully downloaded but not yet +// persisted data items. +type syncMemBatch struct { + batch map[common.Hash][]byte // In-memory membatch of recently ocmpleted items + order []common.Hash // Order of completion to prevent out-of-order data loss +} + +// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. +func newSyncMemBatch() *syncMemBatch { + return &syncMemBatch{ + batch: make(map[common.Hash][]byte), + order: make([]common.Hash, 0, 256), + } +} + // TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a // leaf node. It's used by state syncing to check if the leaf node requires some // further data syncing. @@ -57,7 +76,8 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error // unknown trie hashes to retrieve, accepts node data associated with said hashes // and reconstructs the trie step by step until all is done. type TrieSync struct { - database DatabaseReader + database DatabaseReader // Persistent database to check for existing entries + membatch *syncMemBatch // Memory buffer to avoid frequest database writes requests map[common.Hash]*request // Pending requests pertaining to a key hash queue *prque.Prque // Priority queue with the pending requests } @@ -66,6 +86,7 @@ type TrieSync struct { func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync { ts := &TrieSync{ database: database, + membatch: newSyncMemBatch(), requests: make(map[common.Hash]*request), queue: prque.New(), } @@ -79,6 +100,9 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c if root == emptyRoot { return } + if _, ok := s.membatch.batch[root]; ok { + return + } key := root.Bytes() blob, _ := s.database.Get(key) if local, err := decodeNode(key, blob, 0); local != nil && err == nil { @@ -111,6 +135,9 @@ func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) if hash == emptyState { return } + if _, ok := s.membatch.batch[hash]; ok { + return + } if blob, _ := s.database.Get(hash.Bytes()); blob != nil { return } @@ -144,7 +171,7 @@ func (s *TrieSync) Missing(max int) []common.Hash { // Process injects a batch of retrieved trie nodes data, returning if something // was committed to the database and also the index of an entry if processing of // it failed. -func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, error) { +func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { committed := false for i, item := range results { @@ -153,10 +180,13 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, if request == nil { return committed, i, ErrNotRequested } + if request.data != nil { + return committed, i, ErrAlreadyProcessed + } // If the item is a raw entry request, commit directly if request.raw { request.data = item.Data - s.commit(request, dbw) + s.commit(request) committed = true continue } @@ -173,7 +203,7 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, return committed, i, err } if len(requests) == 0 && request.deps == 0 { - s.commit(request, dbw) + s.commit(request) committed = true continue } @@ -185,6 +215,22 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, return committed, 0, nil } +// Commit flushes the data stored in the internal membatch out to persistent +// storage, returning th enumber of items written and any occurred error. +func (s *TrieSync) Commit(dbw DatabaseWriter) (int, error) { + // Dump the membatch into a database dbw + for i, key := range s.membatch.order { + if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil { + return i, err + } + } + written := len(s.membatch.order) + + // Drop the membatch data and return + s.membatch = newSyncMemBatch() + return written, nil +} + // Pending returns the number of state entries currently pending for download. func (s *TrieSync) Pending() int { return len(s.requests) @@ -246,13 +292,17 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { // If the child references another node, resolve or schedule if node, ok := (child.node).(hashNode); ok { // Try to resolve the node from the local database + hash := common.BytesToHash(node) + if _, ok := s.membatch.batch[hash]; ok { + continue + } blob, _ := s.database.Get(node) if local, err := decodeNode(node[:], blob, 0); local != nil && err == nil { continue } // Locally unknown node, schedule for retrieval requests = append(requests, &request{ - hash: common.BytesToHash(node), + hash: hash, parents: []*request{req}, depth: child.depth, callback: req.callback, @@ -262,21 +312,21 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { return requests, nil } -// commit finalizes a retrieval request and stores it into the database. If any +// commit finalizes a retrieval request and stores it into the membatch. If any // of the referencing parent requests complete due to this commit, they are also // committed themselves. -func (s *TrieSync) commit(req *request, dbw DatabaseWriter) (err error) { - // Write the node content to disk - if err := dbw.Put(req.hash[:], req.data); err != nil { - return err - } +func (s *TrieSync) commit(req *request) (err error) { + // Write the node content to the membatch + s.membatch.batch[req.hash] = req.data + s.membatch.order = append(s.membatch.order, req.hash) + delete(s.requests, req.hash) // Check all parents for completion for _, parent := range req.parents { parent.deps-- if parent.deps == 0 { - if err := s.commit(parent, dbw); err != nil { + if err := s.commit(parent); err != nil { return err } } diff --git a/trie/sync_test.go b/trie/sync_test.go index d778555b91..ec16a25bd9 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -122,9 +122,12 @@ func testIterativeTrieSync(t *testing.T, batch int) { } results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(batch)...) } // Cross check that the two tries are in sync @@ -152,9 +155,12 @@ func TestIterativeDelayedTrieSync(t *testing.T) { } results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[len(results):], sched.Missing(10000)...) } // Cross check that the two tries are in sync @@ -190,9 +196,12 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) { results = append(results, SyncResult{hash, data}) } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { queue[hash] = struct{}{} @@ -231,9 +240,12 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { delete(queue, result.Hash) } @@ -272,9 +284,12 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) { results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(0)...) } // Cross check that the two tries are in sync @@ -304,9 +319,12 @@ func TestIncompleteTrieSync(t *testing.T) { results[i] = SyncResult{hash, data} } // Process each of the trie nodes - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { added = append(added, result.Hash) }