diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 7dfc419f4e..8088f16af9 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -106,7 +106,7 @@ func (b *beaconBackfiller) resume() { }() // If the downloader fails, report an error as in beacon chain mode there // should be no errors as long as the chain we're syncing to is valid. - if err := b.downloader.synchronise("", common.Hash{}, nil, nil, mode, true, b.started); err != nil { + if err := b.downloader.synchronise(mode, b.started); err != nil { log.Error("Beacon backfilling failed", "err", err) return } @@ -268,9 +268,9 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) { return start, nil } -// fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling +// fetchHeaders feeds skeleton headers to the downloader queue for scheduling // until sync errors or is finished. -func (d *Downloader) fetchBeaconHeaders(from uint64) error { +func (d *Downloader) fetchHeaders(from uint64) error { var head *types.Header _, tail, _, err := d.skeleton.Bounds() if err != nil { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c5b56e91dd..bb083260e4 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -41,17 +41,14 @@ import ( var ( MaxBlockFetch = 128 // Number of blocks to be fetched per retrieval request MaxHeaderFetch = 192 // Number of block headers to be fetched per retrieval request - MaxSkeletonSize = 128 // Number of header fetches needed for a skeleton assembly MaxReceiptFetch = 256 // Number of transaction receipts to allow fetching per request - maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) - maxHeadersProcess = 2048 // Number of header download results to import at once into the chain - maxResultsProcess = 2048 // Number of content download results to import at once into the chain - fullMaxForkAncestry uint64 = params.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it) - lightMaxForkAncestry uint64 = params.LightImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it) + maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxHeadersProcess = 2048 // Number of header download results to import at once into the chain + maxResultsProcess = 2048 // Number of content download results to import at once into the chain + fullMaxForkAncestry uint64 = params.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it) - reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection - reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs + reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download @@ -59,24 +56,16 @@ var ( ) var ( - errBusy = errors.New("busy") - errUnknownPeer = errors.New("peer is unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") - errStallingPeer = errors.New("peer is stalling") - errUnsyncedPeer = errors.New("unsynced peer") - errNoPeers = errors.New("no peers to keep download active") + errBusy = errors.New("busy") + errBadPeer = errors.New("action from bad peer ignored") + errTimeout = errors.New("timeout") - errEmptyHeaderSet = errors.New("empty header set by peer") - errPeersUnavailable = errors.New("no peers available or all tried for download") - errInvalidAncestor = errors.New("retrieved ancestor is invalid") errInvalidChain = errors.New("retrieved hash chain is invalid") errInvalidBody = errors.New("retrieved block body is invalid") errInvalidReceipt = errors.New("retrieved receipt is invalid") errCancelStateFetch = errors.New("state data download canceled (requested)") errCancelContentProcessing = errors.New("content processing canceled (requested)") errCanceled = errors.New("syncing canceled (requested)") - errTooOld = errors.New("peer's protocol version too old") - errNoAncestorFound = errors.New("no common ancestor found") errNoPivotHeader = errors.New("pivot header is not found") ErrMergeTransition = errors.New("legacy sync reached the merge") ) @@ -99,9 +88,8 @@ type Downloader struct { mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode mux *event.TypeMux // Event multiplexer to announce sync operation events - genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT) - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed stateDB ethdb.Database // Database to state sync into (and deduplicate via) @@ -118,11 +106,10 @@ type Downloader struct { badBlock badBlockFn // Reports a block as rejected by the chain // Status - synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing - synchronising atomic.Bool - notified atomic.Bool - committed atomic.Bool - ancientLimit uint64 // The maximum block number which can be regarded as ancient data. + synchronising atomic.Bool + notified atomic.Bool + committed atomic.Bool + ancientLimit uint64 // The maximum block number which can be regarded as ancient data. // Channels headerProcCh chan *headerTask // Channel to feed the header processor new tasks @@ -138,7 +125,6 @@ type Downloader struct { stateSyncStart chan *stateSync // Cancellation and termination - cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited. @@ -147,7 +133,6 @@ type Downloader struct { quitLock sync.Mutex // Lock to prevent double closes // Testing hooks - syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) @@ -326,39 +311,10 @@ func (d *Downloader) UnregisterPeer(id string) error { return nil } -// LegacySync tries to sync up our local block chain with a remote peer, both -// adding various sanity checks as well as wrapping it with various log entries. -func (d *Downloader) LegacySync(id string, head common.Hash, td, ttd *big.Int, mode SyncMode) error { - err := d.synchronise(id, head, td, ttd, mode, false, nil) - - switch err { - case nil, errBusy, errCanceled: - return err - } - if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) || - errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) || - errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) { - log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) - if d.dropPeer == nil { - // The dropPeer method is nil when `--copydb` is used for a local copy. - // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored - log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id) - } else { - d.dropPeer(id) - } - return err - } - if errors.Is(err, ErrMergeTransition) { - return err // This is an expected fault, don't keep printing it in a spin-loop - } - log.Warn("Synchronisation failed, retrying", "err", err) - return err -} - // synchronise will select the peer and use it for synchronising. If an empty string is given // it will use the best peer possible and synchronize if its TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int, mode SyncMode, beaconMode bool, beaconPing chan struct{}) error { +func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error { // The beacon header syncer is async. It will start this synchronization and // will continue doing other tasks. However, if synchronization needs to be // cancelled, the syncer needs to know if we reached the startup point (and @@ -373,10 +329,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int, } }() } - // Mock out the synchronisation if testing - if d.synchroniseMock != nil { - return d.synchroniseMock(id, hash) - } // Make sure only one goroutine is ever allowed past this point at once if !d.synchronising.CompareAndSwap(false, true) { return errBusy @@ -424,7 +376,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int, // Create cancel channel for aborting mid-flight and mark the master peer d.cancelLock.Lock() d.cancelCh = make(chan struct{}) - d.cancelPeer = id d.cancelLock.Unlock() defer d.Cancel() // No matter what, we can't leave the cancel channel open @@ -432,27 +383,19 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int, // Atomically set the requested sync mode d.mode.Store(uint32(mode)) - // Retrieve the origin peer and initiate the downloading process - var p *peerConnection - if !beaconMode { // Beacon mode doesn't need a peer to sync from - p = d.peers.Peer(id) - if p == nil { - return errUnknownPeer - } - } if beaconPing != nil { close(beaconPing) } - return d.syncWithPeer(p, hash, td, ttd, beaconMode) + return d.syncToHead() } func (d *Downloader) getMode() SyncMode { return SyncMode(d.mode.Load()) } -// syncWithPeer starts a block synchronization based on the hash chain from the -// specified peer and head hash. -func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *big.Int, beaconMode bool) (err error) { +// syncToHead starts a block synchronization based on the hash chain from +// the specified head hash. +func (d *Downloader) syncToHead() (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error @@ -465,52 +408,39 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * }() mode := d.getMode() - if !beaconMode { - log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode) - } else { - log.Debug("Backfilling with the network", "mode", mode) - } + log.Debug("Backfilling with the network", "mode", mode) defer func(start time.Time) { log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start))) }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block var latest, pivot, final *types.Header - if !beaconMode { - // In legacy mode, use the master peer to retrieve the headers from - latest, pivot, err = d.fetchHead(p) - if err != nil { - return err - } - } else { - // In beacon mode, use the skeleton chain to retrieve the headers from - latest, _, final, err = d.skeleton.Bounds() - if err != nil { - return err - } - if latest.Number.Uint64() > uint64(fsMinFullBlocks) { - number := latest.Number.Uint64() - uint64(fsMinFullBlocks) + latest, _, final, err = d.skeleton.Bounds() + if err != nil { + return err + } + if latest.Number.Uint64() > uint64(fsMinFullBlocks) { + number := latest.Number.Uint64() - uint64(fsMinFullBlocks) - // Retrieve the pivot header from the skeleton chain segment but - // fallback to local chain if it's not found in skeleton space. - if pivot = d.skeleton.Header(number); pivot == nil { - _, oldest, _, _ := d.skeleton.Bounds() // error is already checked - if number < oldest.Number.Uint64() { - count := int(oldest.Number.Uint64() - number) // it's capped by fsMinFullBlocks - headers := d.readHeaderRange(oldest, count) - if len(headers) == count { - pivot = headers[len(headers)-1] - log.Warn("Retrieved pivot header from local", "number", pivot.Number, "hash", pivot.Hash(), "latest", latest.Number, "oldest", oldest.Number) - } + // Retrieve the pivot header from the skeleton chain segment but + // fallback to local chain if it's not found in skeleton space. + if pivot = d.skeleton.Header(number); pivot == nil { + _, oldest, _, _ := d.skeleton.Bounds() // error is already checked + if number < oldest.Number.Uint64() { + count := int(oldest.Number.Uint64() - number) // it's capped by fsMinFullBlocks + headers := d.readHeaderRange(oldest, count) + if len(headers) == count { + pivot = headers[len(headers)-1] + log.Warn("Retrieved pivot header from local", "number", pivot.Number, "hash", pivot.Hash(), "latest", latest.Number, "oldest", oldest.Number) } } - // Print an error log and return directly in case the pivot header - // is still not found. It means the skeleton chain is not linked - // correctly with local chain. - if pivot == nil { - log.Error("Pivot header is not found", "number", number) - return errNoPivotHeader - } + } + // Print an error log and return directly in case the pivot header + // is still not found. It means the skeleton chain is not linked + // correctly with local chain. + if pivot == nil { + log.Error("Pivot header is not found", "number", number) + return errNoPivotHeader } } // If no pivot block was returned, the head is below the min full block @@ -522,19 +452,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } height := latest.Number.Uint64() - var origin uint64 - if !beaconMode { - // In legacy mode, reach out to the network and find the ancestor - origin, err = d.findAncestor(p, latest) - if err != nil { - return err - } - } else { - // In beacon mode, use the skeleton chain for the ancestor lookup - origin, err = d.findBeaconAncestor() - if err != nil { - return err - } + // In beacon mode, use the skeleton chain for the ancestor lookup + origin, err := d.findBeaconAncestor() + if err != nil { + return err } d.syncStatsLock.Lock() if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { @@ -577,24 +498,15 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * // the ancientLimit through that. Otherwise calculate the ancient limit through // the advertised height of the remote peer. This most is mostly a fallback for // legacy networks, but should eventually be dropped. TODO(karalabe). - if beaconMode { - // Beacon sync, use the latest finalized block as the ancient limit - // or a reasonable height if no finalized block is yet announced. - if final != nil { - d.ancientLimit = final.Number.Uint64() - } else if height > fullMaxForkAncestry+1 { - d.ancientLimit = height - fullMaxForkAncestry - 1 - } else { - d.ancientLimit = 0 - } + // + // Beacon sync, use the latest finalized block as the ancient limit + // or a reasonable height if no finalized block is yet announced. + if final != nil { + d.ancientLimit = final.Number.Uint64() + } else if height > fullMaxForkAncestry+1 { + d.ancientLimit = height - fullMaxForkAncestry - 1 } else { - // Legacy sync, use the best announcement we have from the remote peer. - // TODO(karalabe): Drop this pathway. - if height > fullMaxForkAncestry+1 { - d.ancientLimit = height - fullMaxForkAncestry - 1 - } else { - d.ancientLimit = 0 - } + d.ancientLimit = 0 } frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. @@ -616,22 +528,13 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, mode) - if d.syncInitHook != nil { - d.syncInitHook(origin, height) - } - var headerFetcher func() error - if !beaconMode { - // In legacy mode, headers are retrieved from the network - headerFetcher = func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) } - } else { - // In beacon mode, headers are served by the skeleton syncer - headerFetcher = func() error { return d.fetchBeaconHeaders(origin + 1) } - } + + // In beacon mode, headers are served by the skeleton syncer fetchers := []func() error{ - headerFetcher, // Headers are always retrieved - func() error { return d.fetchBodies(origin+1, beaconMode) }, // Bodies are retrieved during normal and snap sync - func() error { return d.fetchReceipts(origin+1, beaconMode) }, // Receipts are retrieved during snap sync - func() error { return d.processHeaders(origin+1, td, ttd, beaconMode) }, + func() error { return d.fetchHeaders(origin + 1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync + func() error { return d.processHeaders(origin + 1) }, } if mode == SnapSync { d.pivotLock.Lock() @@ -640,7 +543,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * fetchers = append(fetchers, func() error { return d.processSnapSyncContent() }) } else if mode == FullSync { - fetchers = append(fetchers, func() error { return d.processFullSyncContent(ttd, beaconMode) }) + fetchers = append(fetchers, func() error { return d.processFullSyncContent() }) } return d.spawnSync(fetchers) } @@ -719,540 +622,12 @@ func (d *Downloader) Terminate() { d.Cancel() } -// fetchHead retrieves the head header and prior pivot block (if available) from -// a remote peer. -func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) { - p.log.Debug("Retrieving remote chain head") - mode := d.getMode() - - // Request the advertised remote head block and wait for the response - latest, _ := p.peer.Head() - fetch := 1 - if mode == SnapSync { - fetch = 2 // head + pivot headers - } - headers, hashes, err := d.fetchHeadersByHash(p, latest, fetch, fsMinFullBlocks-1, true) - if err != nil { - return nil, nil, err - } - // Make sure the peer gave us at least one and at most the requested headers - if len(headers) == 0 || len(headers) > fetch { - return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch) - } - // The first header needs to be the head, validate against the request. If - // only 1 header was returned, make sure there's no pivot or there was not - // one requested. - head = headers[0] - if len(headers) == 1 { - if mode == SnapSync && head.Number.Uint64() > uint64(fsMinFullBlocks) { - return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer) - } - p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", hashes[0]) - return head, nil, nil - } - // At this point we have 2 headers in total and the first is the - // validated head of the chain. Check the pivot number and return, - pivot = headers[1] - if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) { - return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks)) - } - return head, pivot, nil -} - -// calculateRequestSpan calculates what headers to request from a peer when trying to determine the -// common ancestor. -// It returns parameters to be used for peer.RequestHeadersByNumber: -// -// from - starting block number -// count - number of headers to request -// skip - number of headers to skip -// -// and also returns 'max', the last block which is expected to be returned by the remote peers, -// given the (from,count,skip) -func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, uint64) { - var ( - from int - count int - MaxCount = MaxHeaderFetch / 16 - ) - // requestHead is the highest block that we will ask for. If requestHead is not offset, - // the highest block that we will get is 16 blocks back from head, which means we - // will fetch 14 or 15 blocks unnecessarily in the case the height difference - // between us and the peer is 1-2 blocks, which is most common - requestHead := int(remoteHeight) - 1 - if requestHead < 0 { - requestHead = 0 - } - // requestBottom is the lowest block we want included in the query - // Ideally, we want to include the one just below our own head - requestBottom := int(localHeight - 1) - if requestBottom < 0 { - requestBottom = 0 - } - totalSpan := requestHead - requestBottom - span := 1 + totalSpan/MaxCount - if span < 2 { - span = 2 - } - if span > 16 { - span = 16 - } - - count = 1 + totalSpan/span - if count > MaxCount { - count = MaxCount - } - if count < 2 { - count = 2 - } - from = requestHead - (count-1)*span - if from < 0 { - from = 0 - } - max := from + (count-1)*span - return int64(from), count, span - 1, uint64(max) -} - -// findAncestor tries to locate the common ancestor link of the local chain and -// a remote peers blockchain. In the general case when our node was in sync and -// on the correct chain, checking the top N links should already get us a match. -// In the rare scenario when we ended up on a long reorganisation (i.e. none of -// the head links match), we do a binary search to find the common ancestor. -func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) { - // Figure out the valid ancestor range to prevent rewrite attacks - var ( - floor = int64(-1) - localHeight uint64 - remoteHeight = remoteHeader.Number.Uint64() - ) - mode := d.getMode() - switch mode { - case FullSync: - localHeight = d.blockchain.CurrentBlock().Number.Uint64() - case SnapSync: - localHeight = d.blockchain.CurrentSnapBlock().Number.Uint64() - default: - localHeight = d.lightchain.CurrentHeader().Number.Uint64() - } - p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight) - - // Recap floor value for binary search - maxForkAncestry := fullMaxForkAncestry - if d.getMode() == LightSync { - maxForkAncestry = lightMaxForkAncestry - } - if localHeight >= maxForkAncestry { - // We're above the max reorg threshold, find the earliest fork point - floor = int64(localHeight - maxForkAncestry) - } - // If we're doing a light sync, ensure the floor doesn't go below the CHT, as - // all headers before that point will be missing. - if mode == LightSync { - // If we don't know the current CHT position, find it - if d.genesis == 0 { - header := d.lightchain.CurrentHeader() - for header != nil { - d.genesis = header.Number.Uint64() - if floor >= int64(d.genesis)-1 { - break - } - header = d.lightchain.GetHeaderByHash(header.ParentHash) - } - } - // We already know the "genesis" block number, cap floor to that - if floor < int64(d.genesis)-1 { - floor = int64(d.genesis) - 1 - } - } - - ancestor, err := d.findAncestorSpanSearch(p, mode, remoteHeight, localHeight, floor) - if err == nil { - return ancestor, nil - } - // The returned error was not nil. - // If the error returned does not reflect that a common ancestor was not found, return it. - // If the error reflects that a common ancestor was not found, continue to binary search, - // where the error value will be reassigned. - if !errors.Is(err, errNoAncestorFound) { - return 0, err - } - - ancestor, err = d.findAncestorBinarySearch(p, mode, remoteHeight, floor) - if err != nil { - return 0, err - } - return ancestor, nil -} - -func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, remoteHeight, localHeight uint64, floor int64) (uint64, error) { - from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight) - - p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip) - headers, hashes, err := d.fetchHeadersByNumber(p, uint64(from), count, skip, false) - if err != nil { - return 0, err - } - // Wait for the remote response to the head fetch - number, hash := uint64(0), common.Hash{} - - // Make sure the peer actually gave something valid - if len(headers) == 0 { - p.log.Warn("Empty head header set") - return 0, errEmptyHeaderSet - } - // Make sure the peer's reply conforms to the request - for i, header := range headers { - expectNumber := from + int64(i)*int64(skip+1) - if number := header.Number.Int64(); number != expectNumber { - p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number) - return 0, fmt.Errorf("%w: %v", errInvalidChain, errors.New("head headers broke chain ordering")) - } - } - // Check if a common ancestor was found - for i := len(headers) - 1; i >= 0; i-- { - // Skip any headers that underflow/overflow our requested set - if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max { - continue - } - // Otherwise check if we already know the header or not - h := hashes[i] - n := headers[i].Number.Uint64() - - var known bool - switch mode { - case FullSync: - known = d.blockchain.HasBlock(h, n) - case SnapSync: - known = d.blockchain.HasFastBlock(h, n) - default: - known = d.lightchain.HasHeader(h, n) - } - if known { - number, hash = n, h - break - } - } - // If the head fetch already found an ancestor, return - if hash != (common.Hash{}) { - if int64(number) <= floor { - p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor) - return 0, errInvalidAncestor - } - p.log.Debug("Found common ancestor", "number", number, "hash", hash) - return number, nil - } - return 0, errNoAncestorFound -} - -func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode, remoteHeight uint64, floor int64) (uint64, error) { - hash := common.Hash{} - - // Ancestor not found, we need to binary search over our chain - start, end := uint64(0), remoteHeight - if floor > 0 { - start = uint64(floor) - } - p.log.Trace("Binary searching for common ancestor", "start", start, "end", end) - - for start+1 < end { - // Split our chain interval in two, and request the hash to cross check - check := (start + end) / 2 - - headers, hashes, err := d.fetchHeadersByNumber(p, check, 1, 0, false) - if err != nil { - return 0, err - } - // Make sure the peer actually gave something valid - if len(headers) != 1 { - p.log.Warn("Multiple headers for single request", "headers", len(headers)) - return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers)) - } - // Modify the search interval based on the response - h := hashes[0] - n := headers[0].Number.Uint64() - - var known bool - switch mode { - case FullSync: - known = d.blockchain.HasBlock(h, n) - case SnapSync: - known = d.blockchain.HasFastBlock(h, n) - default: - known = d.lightchain.HasHeader(h, n) - } - if !known { - end = check - continue - } - header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists - if header.Number.Uint64() != check { - p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) - return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number) - } - start = check - hash = h - } - // Ensure valid ancestry and return - if int64(start) <= floor { - p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor) - return 0, errInvalidAncestor - } - p.log.Debug("Found common ancestor", "number", start, "hash", hash) - return start, nil -} - -// fetchHeaders keeps retrieving headers concurrently from the number -// requested, until no more are returned, potentially throttling on the way. To -// facilitate concurrency but still protect against malicious nodes sending bad -// headers, we construct a header chain skeleton using the "origin" peer we are -// syncing with, and fill in the missing headers using anyone else. Headers from -// other peers are only accepted if they map cleanly to the skeleton. If no one -// can fill in the skeleton - not even the origin peer - it's assumed invalid and -// the origin is dropped. -func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) error { - p.log.Debug("Directing header downloads", "origin", from) - defer p.log.Debug("Header download terminated") - - // Start pulling the header chain skeleton until all is done - var ( - skeleton = true // Skeleton assembly phase or finishing up - pivoting = false // Whether the next request is pivot verification - ancestor = from - mode = d.getMode() - ) - for { - // Pull the next batch of headers, it either: - // - Pivot check to see if the chain moved too far - // - Skeleton retrieval to permit concurrent header fetches - // - Full header retrieval if we're near the chain head - var ( - headers []*types.Header - hashes []common.Hash - err error - ) - switch { - case pivoting: - d.pivotLock.RLock() - pivot := d.pivotHeader.Number.Uint64() - d.pivotLock.RUnlock() - - p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks)) - headers, hashes, err = d.fetchHeadersByNumber(p, pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep - - case skeleton: - p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) - headers, hashes, err = d.fetchHeadersByNumber(p, from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) - - default: - p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) - headers, hashes, err = d.fetchHeadersByNumber(p, from, MaxHeaderFetch, 0, false) - } - switch err { - case nil: - // Headers retrieved, continue with processing - - case errCanceled: - // Sync cancelled, no issue, propagate up - return err - - default: - // Header retrieval either timed out, or the peer failed in some strange way - // (e.g. disconnect). Consider the master peer bad and drop - d.dropPeer(p.id) - - // Finish the sync gracefully instead of dumping the gathered data though - for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} { - select { - case ch <- false: - case <-d.cancelCh: - } - } - select { - case d.headerProcCh <- nil: - case <-d.cancelCh: - } - return fmt.Errorf("%w: header request failed: %v", errBadPeer, err) - } - // If the pivot is being checked, move if it became stale and run the real retrieval - var pivot uint64 - - d.pivotLock.RLock() - if d.pivotHeader != nil { - pivot = d.pivotHeader.Number.Uint64() - } - d.pivotLock.RUnlock() - - if pivoting { - if len(headers) == 2 { - if have, want := headers[0].Number.Uint64(), pivot+uint64(fsMinFullBlocks); have != want { - log.Warn("Peer sent invalid next pivot", "have", have, "want", want) - return fmt.Errorf("%w: next pivot number %d != requested %d", errInvalidChain, have, want) - } - if have, want := headers[1].Number.Uint64(), pivot+2*uint64(fsMinFullBlocks)-8; have != want { - log.Warn("Peer sent invalid pivot confirmer", "have", have, "want", want) - return fmt.Errorf("%w: next pivot confirmer number %d != requested %d", errInvalidChain, have, want) - } - log.Warn("Pivot seemingly stale, moving", "old", pivot, "new", headers[0].Number) - pivot = headers[0].Number.Uint64() - - d.pivotLock.Lock() - d.pivotHeader = headers[0] - d.pivotLock.Unlock() - - // Write out the pivot into the database so a rollback beyond - // it will reenable snap sync and update the state root that - // the state syncer will be downloading. - rawdb.WriteLastPivotNumber(d.stateDB, pivot) - } - // Disable the pivot check and fetch the next batch of headers - pivoting = false - continue - } - // If the skeleton's finished, pull any remaining head headers directly from the origin - if skeleton && len(headers) == 0 { - // A malicious node might withhold advertised headers indefinitely - if from+uint64(MaxHeaderFetch)-1 <= head { - p.log.Warn("Peer withheld skeleton headers", "advertised", head, "withheld", from+uint64(MaxHeaderFetch)-1) - return fmt.Errorf("%w: withheld skeleton headers: advertised %d, withheld #%d", errStallingPeer, head, from+uint64(MaxHeaderFetch)-1) - } - p.log.Debug("No skeleton, fetching headers directly") - skeleton = false - continue - } - // If no more headers are inbound, notify the content fetchers and return - if len(headers) == 0 { - // Don't abort header fetches while the pivot is downloading - if !d.committed.Load() && pivot <= from { - p.log.Debug("No headers, waiting for pivot commit") - select { - case <-time.After(fsHeaderContCheck): - continue - case <-d.cancelCh: - return errCanceled - } - } - // Pivot done (or not in snap sync) and no more headers, terminate the process - p.log.Debug("No more headers available") - select { - case d.headerProcCh <- nil: - return nil - case <-d.cancelCh: - return errCanceled - } - } - // If we received a skeleton batch, resolve internals concurrently - var progressed bool - if skeleton { - filled, hashset, proced, err := d.fillHeaderSkeleton(from, headers) - if err != nil { - p.log.Debug("Skeleton chain invalid", "err", err) - return fmt.Errorf("%w: %v", errInvalidChain, err) - } - headers = filled[proced:] - hashes = hashset[proced:] - - progressed = proced > 0 - from += uint64(proced) - } else { - // A malicious node might withhold advertised headers indefinitely - if n := len(headers); n < MaxHeaderFetch && headers[n-1].Number.Uint64() < head { - p.log.Warn("Peer withheld headers", "advertised", head, "delivered", headers[n-1].Number.Uint64()) - return fmt.Errorf("%w: withheld headers: advertised %d, delivered %d", errStallingPeer, head, headers[n-1].Number.Uint64()) - } - // If we're closing in on the chain head, but haven't yet reached it, delay - // the last few headers so mini reorgs on the head don't cause invalid hash - // chain errors. - if n := len(headers); n > 0 { - // Retrieve the current head we're at - var head uint64 - if mode == LightSync { - head = d.lightchain.CurrentHeader().Number.Uint64() - } else { - head = d.blockchain.CurrentSnapBlock().Number.Uint64() - if full := d.blockchain.CurrentBlock().Number.Uint64(); head < full { - head = full - } - } - // If the head is below the common ancestor, we're actually deduplicating - // already existing chain segments, so use the ancestor as the fake head. - // Otherwise, we might end up delaying header deliveries pointlessly. - if head < ancestor { - head = ancestor - } - // If the head is way older than this batch, delay the last few headers - if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() { - delay := reorgProtHeaderDelay - if delay > n { - delay = n - } - headers = headers[:n-delay] - hashes = hashes[:n-delay] - } - } - } - // If no headers have been delivered, or all of them have been delayed, - // sleep a bit and retry. Take care with headers already consumed during - // skeleton filling - if len(headers) == 0 && !progressed { - p.log.Trace("All headers delayed, waiting") - select { - case <-time.After(fsHeaderContCheck): - continue - case <-d.cancelCh: - return errCanceled - } - } - // Insert any remaining new headers and fetch the next batch - if len(headers) > 0 { - p.log.Trace("Scheduling new headers", "count", len(headers), "from", from) - select { - case d.headerProcCh <- &headerTask{ - headers: headers, - hashes: hashes, - }: - case <-d.cancelCh: - return errCanceled - } - from += uint64(len(headers)) - } - // If we're still skeleton filling snap sync, check pivot staleness - // before continuing to the next skeleton filling - if skeleton && pivot > 0 { - pivoting = true - } - } -} - -// fillHeaderSkeleton concurrently retrieves headers from all our available peers -// and maps them to the provided skeleton header chain. -// -// Any partial results from the beginning of the skeleton is (if possible) forwarded -// immediately to the header processor to keep the rest of the pipeline full even -// in the case of header stalls. -// -// The method returns the entire filled skeleton and also the number of headers -// already forwarded for processing. -func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, []common.Hash, int, error) { - log.Debug("Filling up skeleton", "from", from) - d.queue.ScheduleSkeleton(from, skeleton) - - err := d.concurrentFetch((*headerQueue)(d), false) - if err != nil { - log.Debug("Skeleton fill failed", "err", err) - } - filled, hashes, proced := d.queue.RetrieveHeaders() - if err == nil { - log.Debug("Skeleton fill succeeded", "filled", len(filled), "processed", proced) - } - return filled, hashes, proced, err -} - // fetchBodies iteratively downloads the scheduled block bodies, taking any // available peers, reserving a chunk of blocks for each, waiting for delivery // and also periodically checking for timeouts. -func (d *Downloader) fetchBodies(from uint64, beaconMode bool) error { +func (d *Downloader) fetchBodies(from uint64) error { log.Debug("Downloading block bodies", "origin", from) - err := d.concurrentFetch((*bodyQueue)(d), beaconMode) + err := d.concurrentFetch((*bodyQueue)(d)) log.Debug("Block body download terminated", "err", err) return err @@ -1261,9 +636,9 @@ func (d *Downloader) fetchBodies(from uint64, beaconMode bool) error { // fetchReceipts iteratively downloads the scheduled block receipts, taking any // available peers, reserving a chunk of receipts for each, waiting for delivery // and also periodically checking for timeouts. -func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error { +func (d *Downloader) fetchReceipts(from uint64) error { log.Debug("Downloading receipts", "origin", from) - err := d.concurrentFetch((*receiptQueue)(d), beaconMode) + err := d.concurrentFetch((*receiptQueue)(d)) log.Debug("Receipt download terminated", "err", err) return err @@ -1272,11 +647,10 @@ func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error { // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. -func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error { +func (d *Downloader) processHeaders(origin uint64) error { var ( - mode = d.getMode() - gotHeaders = false // Wait for batches of headers to process - timer = time.NewTimer(time.Second) + mode = d.getMode() + timer = time.NewTimer(time.Second) ) defer timer.Stop() @@ -1295,48 +669,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode case <-d.cancelCh: } } - // If we're in legacy sync mode, we need to check total difficulty - // violations from malicious peers. That is not needed in beacon - // mode and we can skip to terminating sync. - if !beaconMode { - // If no headers were retrieved at all, the peer violated its TD promise that it had a - // better chain compared to ours. The only exception is if its promised blocks were - // already imported by other means (e.g. fetcher): - // - // R , L : Both at block 10 - // R: Mine block 11, and propagate it to L - // L: Queue block 11 for import - // L: Notice that R's head and TD increased compared to ours, start sync - // L: Import of block 11 finishes - // L: Sync begins, and finds common ancestor at 11 - // L: Request new headers up from 11 (R's TD was higher, it must have something) - // R: Nothing to give - if mode != LightSync { - head := d.blockchain.CurrentBlock() - if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { - return errStallingPeer - } - } - // If snap or light syncing, ensure promised headers are indeed delivered. This is - // needed to detect scenarios where an attacker feeds a bad pivot and then bails out - // of delivering the post-pivot blocks that would flag the invalid content. - // - // This check cannot be executed "as is" for full imports, since blocks may still be - // queued for processing when the header download completes. However, as long as the - // peer gave us something useful, we're already happy/progressed (above check). - if mode == SnapSync || mode == LightSync { - head := d.lightchain.CurrentHeader() - if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { - return errStallingPeer - } - } - } return nil } // Otherwise split the chunk of headers into batches and process them headers, hashes := task.headers, task.hashes - gotHeaders = true for len(headers) > 0 { // Terminate if something failed in between processing chunks select { @@ -1357,44 +694,12 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode // Although the received headers might be all valid, a legacy // PoW/PoA sync must not accept post-merge headers. Make sure // that any transition is rejected at this point. - var ( - rejected []*types.Header - td *big.Int - ) - if !beaconMode && ttd != nil { - td = d.blockchain.GetTd(chunkHeaders[0].ParentHash, chunkHeaders[0].Number.Uint64()-1) - if td == nil { - // This should never really happen, but handle gracefully for now - log.Error("Failed to retrieve parent header TD", "number", chunkHeaders[0].Number.Uint64()-1, "hash", chunkHeaders[0].ParentHash) - return fmt.Errorf("%w: parent TD missing", errInvalidChain) - } - for i, header := range chunkHeaders { - td = new(big.Int).Add(td, header.Difficulty) - if td.Cmp(ttd) >= 0 { - // Terminal total difficulty reached, allow the last header in - if new(big.Int).Sub(td, header.Difficulty).Cmp(ttd) < 0 { - chunkHeaders, rejected = chunkHeaders[:i+1], chunkHeaders[i+1:] - if len(rejected) > 0 { - // Make a nicer user log as to the first TD truly rejected - td = new(big.Int).Add(td, rejected[0].Difficulty) - } - } else { - chunkHeaders, rejected = chunkHeaders[:i], chunkHeaders[i:] - } - break - } - } - } if len(chunkHeaders) > 0 { if n, err := d.lightchain.InsertHeaderChain(chunkHeaders); err != nil { log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err) return fmt.Errorf("%w: %v", errInvalidChain, err) } } - if len(rejected) != 0 { - log.Info("Legacy sync reached merge threshold", "number", rejected[0].Number, "hash", rejected[0].Hash(), "td", td, "ttd", ttd) - return ErrMergeTransition - } } // Unless we're doing light chains, schedule the headers for associated content retrieval if mode == FullSync || mode == SnapSync { @@ -1436,7 +741,7 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode } // processFullSyncContent takes fetch results from the queue and imports them into the chain. -func (d *Downloader) processFullSyncContent(ttd *big.Int, beaconMode bool) error { +func (d *Downloader) processFullSyncContent() error { for { results := d.queue.Results(true) if len(results) == 0 { @@ -1445,44 +750,9 @@ func (d *Downloader) processFullSyncContent(ttd *big.Int, beaconMode bool) error if d.chainInsertHook != nil { d.chainInsertHook(results) } - // Although the received blocks might be all valid, a legacy PoW/PoA sync - // must not accept post-merge blocks. Make sure that pre-merge blocks are - // imported, but post-merge ones are rejected. - var ( - rejected []*fetchResult - td *big.Int - ) - if !beaconMode && ttd != nil { - td = d.blockchain.GetTd(results[0].Header.ParentHash, results[0].Header.Number.Uint64()-1) - if td == nil { - // This should never really happen, but handle gracefully for now - log.Error("Failed to retrieve parent block TD", "number", results[0].Header.Number.Uint64()-1, "hash", results[0].Header.ParentHash) - return fmt.Errorf("%w: parent TD missing", errInvalidChain) - } - for i, result := range results { - td = new(big.Int).Add(td, result.Header.Difficulty) - if td.Cmp(ttd) >= 0 { - // Terminal total difficulty reached, allow the last block in - if new(big.Int).Sub(td, result.Header.Difficulty).Cmp(ttd) < 0 { - results, rejected = results[:i+1], results[i+1:] - if len(rejected) > 0 { - // Make a nicer user log as to the first TD truly rejected - td = new(big.Int).Add(td, rejected[0].Header.Difficulty) - } - } else { - results, rejected = results[:i], results[i:] - } - break - } - } - } if err := d.importBlockResults(results); err != nil { return err } - if len(rejected) != 0 { - log.Info("Legacy sync reached merge threshold", "number", rejected[0].Header.Number, "hash", rejected[0].Header.Hash(), "td", td, "ttd", ttd) - return ErrMergeTransition - } } } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index c810518d56..92403277fd 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -20,7 +20,6 @@ import ( "fmt" "math/big" "os" - "strings" "sync" "sync/atomic" "testing" @@ -94,35 +93,16 @@ func (dl *downloadTester) terminate() { os.RemoveAll(dl.freezer) } -// sync starts synchronizing with a remote peer, blocking until it completes. -func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { - head := dl.peers[id].chain.CurrentBlock() - if td == nil { - // If no particular TD was requested, load from the peer's blockchain - td = dl.peers[id].chain.GetTd(head.Hash(), head.Number.Uint64()) - } - // Synchronise with the chosen peer and ensure proper cleanup afterwards - err := dl.downloader.synchronise(id, head.Hash(), td, nil, mode, false, nil) - select { - case <-dl.downloader.cancelCh: - // Ok, downloader fully cancelled after sync cycle - default: - // Downloader is still accepting packets, can block a peer up - panic("downloader active post sync cycle") // panic will be caught by tester - } - return err -} - // newPeer registers a new block download source into the downloader. func (dl *downloadTester) newPeer(id string, version uint, blocks []*types.Block) *downloadTesterPeer { dl.lock.Lock() defer dl.lock.Unlock() peer := &downloadTesterPeer{ - dl: dl, - id: id, - chain: newTestBlockchain(blocks), - withholdHeaders: make(map[common.Hash]struct{}), + dl: dl, + id: id, + chain: newTestBlockchain(blocks), + withholdBodies: make(map[common.Hash]struct{}), } dl.peers[id] = peer @@ -146,11 +126,10 @@ func (dl *downloadTester) dropPeer(id string) { } type downloadTesterPeer struct { - dl *downloadTester - id string - chain *core.BlockChain - - withholdHeaders map[common.Hash]struct{} + dl *downloadTester + withholdBodies map[common.Hash]struct{} + id string + chain *core.BlockChain } // Head constructs a function to retrieve a peer's current head hash @@ -186,15 +165,6 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i Reverse: reverse, }, nil) headers := unmarshalRlpHeaders(rlpHeaders) - // If a malicious peer is simulated withholding headers, delete them - for hash := range dlp.withholdHeaders { - for i, header := range headers { - if header.Hash() == hash { - headers = append(headers[:i], headers[i+1:]...) - break - } - } - } hashes := make([]common.Hash, len(headers)) for i, header := range headers { hashes[i] = header.Hash() @@ -230,15 +200,6 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, Reverse: reverse, }, nil) headers := unmarshalRlpHeaders(rlpHeaders) - // If a malicious peer is simulated withholding headers, delete them - for hash := range dlp.withholdHeaders { - for i, header := range headers { - if header.Hash() == hash { - headers = append(headers[:i], headers[i+1:]...) - break - } - } - } hashes := make([]common.Hash, len(headers)) for i, header := range headers { hashes[i] = header.Hash() @@ -278,7 +239,13 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et ) hasher := trie.NewStackTrie(nil) for i, body := range bodies { - txsHashes[i] = types.DeriveSha(types.Transactions(body.Transactions), hasher) + hash := types.DeriveSha(types.Transactions(body.Transactions), hasher) + if _, ok := dlp.withholdBodies[hash]; ok { + txsHashes = append(txsHashes[:i], txsHashes[i+1:]...) + uncleHashes = append(uncleHashes[:i], uncleHashes[i+1:]...) + continue + } + txsHashes[i] = hash uncleHashes[i] = types.CalcUncleHash(body.Uncles) } req := ð.Request{ @@ -442,7 +409,10 @@ func TestCanonicalSynchronisation68Snap(t *testing.T) { testCanonSync(t, eth.ET func TestCanonicalSynchronisation68Light(t *testing.T) { testCanonSync(t, eth.ETH68, LightSync) } func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) + success := make(chan struct{}) + tester := newTesterWithNotification(t, func() { + close(success) + }) defer tester.terminate() // Create a small enough block chain to download @@ -450,10 +420,15 @@ func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { tester.newPeer("peer", protocol, chain.blocks[1:]) // Synchronise with the peer and make sure all relevant data was retrieved - if err := tester.sync("peer", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { + t.Fatalf("failed to beacon-sync chain: %v", err) + } + select { + case <-success: + assertOwnChain(t, tester, len(chain.blocks)) + case <-time.NewTimer(time.Second * 3).C: + t.Fatalf("Failed to sync chain in three seconds") } - assertOwnChain(t, tester, len(chain.blocks)) } // Tests that if a large batch of blocks are being downloaded, it is throttled @@ -479,7 +454,7 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { // Start a synchronisation concurrently errc := make(chan error, 1) go func() { - errc <- tester.sync("peer", nil, mode) + errc <- tester.downloader.BeaconSync(mode, testChainBase.blocks[len(testChainBase.blocks)-1].Header(), nil) }() // Iteratively take some blocks, always checking the retrieval count for { @@ -535,132 +510,17 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { } } -// Tests that simple synchronization against a forked chain works correctly. In -// this test common ancestor lookup should *not* be short circuited, and a full -// binary search should be executed. -func TestForkedSync68Full(t *testing.T) { testForkedSync(t, eth.ETH68, FullSync) } -func TestForkedSync68Snap(t *testing.T) { testForkedSync(t, eth.ETH68, SnapSync) } -func TestForkedSync68Light(t *testing.T) { testForkedSync(t, eth.ETH68, LightSync) } - -func testForkedSync(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - chainA := testChainForkLightA.shorten(len(testChainBase.blocks) + 80) - chainB := testChainForkLightB.shorten(len(testChainBase.blocks) + 81) - tester.newPeer("fork A", protocol, chainA.blocks[1:]) - tester.newPeer("fork B", protocol, chainB.blocks[1:]) - // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.sync("fork A", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(chainA.blocks)) - - // Synchronise with the second peer and make sure that fork is pulled too - if err := tester.sync("fork B", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(chainB.blocks)) -} - -// Tests that synchronising against a much shorter but much heavier fork works -// currently and is not dropped. -func TestHeavyForkedSync68Full(t *testing.T) { testHeavyForkedSync(t, eth.ETH68, FullSync) } -func TestHeavyForkedSync68Snap(t *testing.T) { testHeavyForkedSync(t, eth.ETH68, SnapSync) } -func TestHeavyForkedSync68Light(t *testing.T) { testHeavyForkedSync(t, eth.ETH68, LightSync) } - -func testHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - chainA := testChainForkLightA.shorten(len(testChainBase.blocks) + 80) - chainB := testChainForkHeavy.shorten(len(testChainBase.blocks) + 79) - tester.newPeer("light", protocol, chainA.blocks[1:]) - tester.newPeer("heavy", protocol, chainB.blocks[1:]) - - // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.sync("light", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(chainA.blocks)) - - // Synchronise with the second peer and make sure that fork is pulled too - if err := tester.sync("heavy", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(chainB.blocks)) -} - -// Tests that chain forks are contained within a certain interval of the current -// chain head, ensuring that malicious peers cannot waste resources by feeding -// long dead chains. -func TestBoundedForkedSync68Full(t *testing.T) { testBoundedForkedSync(t, eth.ETH68, FullSync) } -func TestBoundedForkedSync68Snap(t *testing.T) { testBoundedForkedSync(t, eth.ETH68, SnapSync) } -func TestBoundedForkedSync68Light(t *testing.T) { testBoundedForkedSync(t, eth.ETH68, LightSync) } - -func testBoundedForkedSync(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - chainA := testChainForkLightA - chainB := testChainForkLightB - tester.newPeer("original", protocol, chainA.blocks[1:]) - tester.newPeer("rewriter", protocol, chainB.blocks[1:]) - - // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.sync("original", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(chainA.blocks)) - - // Synchronise with the second peer and ensure that the fork is rejected to being too old - if err := tester.sync("rewriter", nil, mode); err != errInvalidAncestor { - t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor) - } -} - -// Tests that chain forks are contained within a certain interval of the current -// chain head for short but heavy forks too. These are a bit special because they -// take different ancestor lookup paths. -func TestBoundedHeavyForkedSync68Full(t *testing.T) { - testBoundedHeavyForkedSync(t, eth.ETH68, FullSync) -} -func TestBoundedHeavyForkedSync68Snap(t *testing.T) { - testBoundedHeavyForkedSync(t, eth.ETH68, SnapSync) -} -func TestBoundedHeavyForkedSync68Light(t *testing.T) { - testBoundedHeavyForkedSync(t, eth.ETH68, LightSync) -} - -func testBoundedHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - // Create a long enough forked chain - chainA := testChainForkLightA - chainB := testChainForkHeavy - tester.newPeer("original", protocol, chainA.blocks[1:]) - - // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.sync("original", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(chainA.blocks)) - - tester.newPeer("heavy-rewriter", protocol, chainB.blocks[1:]) - // Synchronise with the second peer and ensure that the fork is rejected to being too old - if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor { - t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor) - } -} - // Tests that a canceled download wipes all previously accumulated state. func TestCancel68Full(t *testing.T) { testCancel(t, eth.ETH68, FullSync) } func TestCancel68Snap(t *testing.T) { testCancel(t, eth.ETH68, SnapSync) } func TestCancel68Light(t *testing.T) { testCancel(t, eth.ETH68, LightSync) } func testCancel(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) + complete := make(chan struct{}) + success := func() { + close(complete) + } + tester := newTesterWithNotification(t, success) defer tester.terminate() chain := testChainBase.shorten(MaxHeaderFetch) @@ -672,38 +532,16 @@ func testCancel(t *testing.T, protocol uint, mode SyncMode) { t.Errorf("download queue not idle") } // Synchronise with the peer, but cancel afterwards - if err := tester.sync("peer", nil, mode); err != nil { + if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } + <-complete tester.downloader.Cancel() if !tester.downloader.queue.Idle() { t.Errorf("download queue not idle") } } -// Tests that synchronisation from multiple peers works as intended (multi thread sanity test). -func TestMultiSynchronisation68Full(t *testing.T) { testMultiSynchronisation(t, eth.ETH68, FullSync) } -func TestMultiSynchronisation68Snap(t *testing.T) { testMultiSynchronisation(t, eth.ETH68, SnapSync) } -func TestMultiSynchronisation68Light(t *testing.T) { testMultiSynchronisation(t, eth.ETH68, LightSync) } - -func testMultiSynchronisation(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - // Create various peers with various parts of the chain - targetPeers := 8 - chain := testChainBase.shorten(targetPeers * 100) - - for i := 0; i < targetPeers; i++ { - id := fmt.Sprintf("peer #%d", i) - tester.newPeer(id, protocol, chain.shorten(len(chain.blocks) / (i + 1)).blocks[1:]) - } - if err := tester.sync("peer #0", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(chain.blocks)) -} - // Tests that synchronisations behave well in multi-version protocol environments // and not wreak havoc on other nodes in the network. func TestMultiProtoSynchronisation68Full(t *testing.T) { testMultiProtoSync(t, eth.ETH68, FullSync) } @@ -711,7 +549,11 @@ func TestMultiProtoSynchronisation68Snap(t *testing.T) { testMultiProtoSync(t, func TestMultiProtoSynchronisation68Light(t *testing.T) { testMultiProtoSync(t, eth.ETH68, LightSync) } func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) + complete := make(chan struct{}) + success := func() { + close(complete) + } + tester := newTesterWithNotification(t, success) defer tester.terminate() // Create a small enough block chain to download @@ -720,9 +562,14 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { // Create peers of every type tester.newPeer("peer 68", eth.ETH68, chain.blocks[1:]) - // Synchronise with the requested peer and make sure all blocks were retrieved - if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { + t.Fatalf("failed to start beacon sync: #{err}") + } + select { + case <-complete: + break + case <-time.NewTimer(time.Second * 3).C: + t.Fatalf("Failed to sync chain in three seconds") } assertOwnChain(t, tester, len(chain.blocks)) @@ -742,7 +589,10 @@ func TestEmptyShortCircuit68Snap(t *testing.T) { testEmptyShortCircuit(t, eth.E func TestEmptyShortCircuit68Light(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, LightSync) } func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) + success := make(chan struct{}) + tester := newTesterWithNotification(t, func() { + close(success) + }) defer tester.terminate() // Create a block chain to download @@ -757,10 +607,19 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { tester.downloader.receiptFetchHook = func(headers []*types.Header) { receiptsHave.Add(int32(len(headers))) } - // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.sync("peer", nil, mode); err != nil { + + if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } + select { + case <-success: + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(len(chain.blocks) - 1), + CurrentBlock: uint64(len(chain.blocks) - 1), + }) + case <-time.NewTimer(time.Second * 3).C: + t.Fatalf("Failed to sync chain in three seconds") + } assertOwnChain(t, tester, len(chain.blocks)) // Validate the number of block bodies that should have been requested @@ -783,195 +642,6 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { } } -// Tests that headers are enqueued continuously, preventing malicious nodes from -// stalling the downloader by feeding gapped header chains. -func TestMissingHeaderAttack68Full(t *testing.T) { testMissingHeaderAttack(t, eth.ETH68, FullSync) } -func TestMissingHeaderAttack68Snap(t *testing.T) { testMissingHeaderAttack(t, eth.ETH68, SnapSync) } -func TestMissingHeaderAttack68Light(t *testing.T) { testMissingHeaderAttack(t, eth.ETH68, LightSync) } - -func testMissingHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - chain := testChainBase.shorten(blockCacheMaxItems - 15) - - attacker := tester.newPeer("attack", protocol, chain.blocks[1:]) - attacker.withholdHeaders[chain.blocks[len(chain.blocks)/2-1].Hash()] = struct{}{} - - if err := tester.sync("attack", nil, mode); err == nil { - t.Fatalf("succeeded attacker synchronisation") - } - // Synchronise with the valid peer and make sure sync succeeds - tester.newPeer("valid", protocol, chain.blocks[1:]) - if err := tester.sync("valid", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(chain.blocks)) -} - -// Tests that if requested headers are shifted (i.e. first is missing), the queue -// detects the invalid numbering. -func TestShiftedHeaderAttack68Full(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH68, FullSync) } -func TestShiftedHeaderAttack68Snap(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH68, SnapSync) } -func TestShiftedHeaderAttack68Light(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH68, LightSync) } - -func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - chain := testChainBase.shorten(blockCacheMaxItems - 15) - - // Attempt a full sync with an attacker feeding shifted headers - attacker := tester.newPeer("attack", protocol, chain.blocks[1:]) - attacker.withholdHeaders[chain.blocks[1].Hash()] = struct{}{} - - if err := tester.sync("attack", nil, mode); err == nil { - t.Fatalf("succeeded attacker synchronisation") - } - // Synchronise with the valid peer and make sure sync succeeds - tester.newPeer("valid", protocol, chain.blocks[1:]) - if err := tester.sync("valid", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(chain.blocks)) -} - -// Tests that a peer advertising a high TD doesn't get to stall the downloader -// afterwards by not sending any useful hashes. -func TestHighTDStarvationAttack68Full(t *testing.T) { - testHighTDStarvationAttack(t, eth.ETH68, FullSync) -} -func TestHighTDStarvationAttack68Snap(t *testing.T) { - testHighTDStarvationAttack(t, eth.ETH68, SnapSync) -} -func TestHighTDStarvationAttack68Light(t *testing.T) { - testHighTDStarvationAttack(t, eth.ETH68, LightSync) -} - -func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - chain := testChainBase.shorten(1) - tester.newPeer("attack", protocol, chain.blocks[1:]) - if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) - } -} - -// Tests that misbehaving peers are disconnected, whilst behaving ones are not. -func TestBlockHeaderAttackerDropping68(t *testing.T) { testBlockHeaderAttackerDropping(t, eth.ETH68) } - -func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { - // Define the disconnection requirement for individual hash fetch errors - tests := []struct { - result error - drop bool - }{ - {nil, false}, // Sync succeeded, all is well - {errBusy, false}, // Sync is already in progress, no problem - {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop - {errBadPeer, true}, // Peer was deemed bad for some reason, drop it - {errStallingPeer, true}, // Peer was detected to be stalling, drop it - {errUnsyncedPeer, true}, // Peer was detected to be unsynced, drop it - {errNoPeers, false}, // No peers to download from, soft race, no issue - {errTimeout, true}, // No hashes received in due time, drop the peer - {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end - {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser - {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter - {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errInvalidBody, false}, // A bad peer was detected, but not the sync origin - {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin - {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop - } - // Run the tests and check disconnection status - tester := newTester(t) - defer tester.terminate() - chain := testChainBase.shorten(1) - - for i, tt := range tests { - // Register a new peer and ensure its presence - id := fmt.Sprintf("test %d", i) - tester.newPeer(id, protocol, chain.blocks[1:]) - if _, ok := tester.peers[id]; !ok { - t.Fatalf("test %d: registered peer not found", i) - } - // Simulate a synchronisation and check the required result - tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } - - tester.downloader.LegacySync(id, tester.chain.Genesis().Hash(), big.NewInt(1000), nil, FullSync) - if _, ok := tester.peers[id]; !ok != tt.drop { - t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) - } - } -} - -// Tests that synchronisation progress (origin block number, current block number -// and highest block number) is tracked and updated correctly. -func TestSyncProgress68Full(t *testing.T) { testSyncProgress(t, eth.ETH68, FullSync) } -func TestSyncProgress68Snap(t *testing.T) { testSyncProgress(t, eth.ETH68, SnapSync) } -func TestSyncProgress68Light(t *testing.T) { testSyncProgress(t, eth.ETH68, LightSync) } - -func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - chain := testChainBase.shorten(blockCacheMaxItems - 15) - - // Set a sync init hook to catch progress changes - starting := make(chan struct{}) - progress := make(chan struct{}) - - tester.downloader.syncInitHook = func(origin, latest uint64) { - starting <- struct{}{} - <-progress - } - checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) - - // Synchronise half the blocks and check initial progress - tester.newPeer("peer-half", protocol, chain.shorten(len(chain.blocks) / 2).blocks[1:]) - pending := new(sync.WaitGroup) - pending.Add(1) - - go func() { - defer pending.Done() - if err := tester.sync("peer-half", nil, mode); err != nil { - panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) - } - }() - <-starting - checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ - HighestBlock: uint64(len(chain.blocks)/2 - 1), - }) - progress <- struct{}{} - pending.Wait() - - // Synchronise all the blocks and check continuation progress - tester.newPeer("peer-full", protocol, chain.blocks[1:]) - pending.Add(1) - go func() { - defer pending.Done() - if err := tester.sync("peer-full", nil, mode); err != nil { - panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) - } - }() - <-starting - checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{ - StartingBlock: uint64(len(chain.blocks)/2 - 1), - CurrentBlock: uint64(len(chain.blocks)/2 - 1), - HighestBlock: uint64(len(chain.blocks) - 1), - }) - - // Check final progress after successful sync - progress <- struct{}{} - pending.Wait() - checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ - StartingBlock: uint64(len(chain.blocks)/2 - 1), - CurrentBlock: uint64(len(chain.blocks) - 1), - HighestBlock: uint64(len(chain.blocks) - 1), - }) -} - func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) { // Mark this method as a helper to report errors at callsite, not in here t.Helper() @@ -982,296 +652,12 @@ func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.Sync } } -// Tests that synchronisation progress (origin block number and highest block -// number) is tracked and updated correctly in case of a fork (or manual head -// revertal). -func TestForkedSyncProgress68Full(t *testing.T) { testForkedSyncProgress(t, eth.ETH68, FullSync) } -func TestForkedSyncProgress68Snap(t *testing.T) { testForkedSyncProgress(t, eth.ETH68, SnapSync) } -func TestForkedSyncProgress68Light(t *testing.T) { testForkedSyncProgress(t, eth.ETH68, LightSync) } - -func testForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - chainA := testChainForkLightA.shorten(len(testChainBase.blocks) + MaxHeaderFetch) - chainB := testChainForkLightB.shorten(len(testChainBase.blocks) + MaxHeaderFetch) - - // Set a sync init hook to catch progress changes - starting := make(chan struct{}) - progress := make(chan struct{}) - - tester.downloader.syncInitHook = func(origin, latest uint64) { - starting <- struct{}{} - <-progress - } - checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) - - // Synchronise with one of the forks and check progress - tester.newPeer("fork A", protocol, chainA.blocks[1:]) - pending := new(sync.WaitGroup) - pending.Add(1) - go func() { - defer pending.Done() - if err := tester.sync("fork A", nil, mode); err != nil { - panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) - } - }() - <-starting - - checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ - HighestBlock: uint64(len(chainA.blocks) - 1), - }) - progress <- struct{}{} - pending.Wait() - - // Simulate a successful sync above the fork - tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight - - // Synchronise with the second fork and check progress resets - tester.newPeer("fork B", protocol, chainB.blocks[1:]) - pending.Add(1) - go func() { - defer pending.Done() - if err := tester.sync("fork B", nil, mode); err != nil { - panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) - } - }() - <-starting - checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{ - StartingBlock: uint64(len(testChainBase.blocks)) - 1, - CurrentBlock: uint64(len(chainA.blocks) - 1), - HighestBlock: uint64(len(chainB.blocks) - 1), - }) - - // Check final progress after successful sync - progress <- struct{}{} - pending.Wait() - checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ - StartingBlock: uint64(len(testChainBase.blocks)) - 1, - CurrentBlock: uint64(len(chainB.blocks) - 1), - HighestBlock: uint64(len(chainB.blocks) - 1), - }) -} - -// Tests that if synchronisation is aborted due to some failure, then the progress -// origin is not updated in the next sync cycle, as it should be considered the -// continuation of the previous sync and not a new instance. -func TestFailedSyncProgress68Full(t *testing.T) { testFailedSyncProgress(t, eth.ETH68, FullSync) } -func TestFailedSyncProgress68Snap(t *testing.T) { testFailedSyncProgress(t, eth.ETH68, SnapSync) } -func TestFailedSyncProgress68Light(t *testing.T) { testFailedSyncProgress(t, eth.ETH68, LightSync) } - -func testFailedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - chain := testChainBase.shorten(blockCacheMaxItems - 15) - - // Set a sync init hook to catch progress changes - starting := make(chan struct{}) - progress := make(chan struct{}) - - tester.downloader.syncInitHook = func(origin, latest uint64) { - starting <- struct{}{} - <-progress - } - checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) - - // Attempt a full sync with a faulty peer - missing := len(chain.blocks)/2 - 1 - - faulter := tester.newPeer("faulty", protocol, chain.blocks[1:]) - faulter.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{} - - pending := new(sync.WaitGroup) - pending.Add(1) - go func() { - defer pending.Done() - if err := tester.sync("faulty", nil, mode); err == nil { - panic("succeeded faulty synchronisation") - } - }() - <-starting - checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ - HighestBlock: uint64(len(chain.blocks) - 1), - }) - progress <- struct{}{} - pending.Wait() - afterFailedSync := tester.downloader.Progress() - - // Synchronise with a good peer and check that the progress origin remind the same - // after a failure - tester.newPeer("valid", protocol, chain.blocks[1:]) - pending.Add(1) - go func() { - defer pending.Done() - if err := tester.sync("valid", nil, mode); err != nil { - panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) - } - }() - <-starting - checkProgress(t, tester.downloader, "completing", afterFailedSync) - - // Check final progress after successful sync - progress <- struct{}{} - pending.Wait() - checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ - CurrentBlock: uint64(len(chain.blocks) - 1), - HighestBlock: uint64(len(chain.blocks) - 1), - }) -} - -// Tests that if an attacker fakes a chain height, after the attack is detected, -// the progress height is successfully reduced at the next sync invocation. -func TestFakedSyncProgress68Full(t *testing.T) { testFakedSyncProgress(t, eth.ETH68, FullSync) } -func TestFakedSyncProgress68Snap(t *testing.T) { testFakedSyncProgress(t, eth.ETH68, SnapSync) } -func TestFakedSyncProgress68Light(t *testing.T) { testFakedSyncProgress(t, eth.ETH68, LightSync) } - -func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) - defer tester.terminate() - - chain := testChainBase.shorten(blockCacheMaxItems - 15) - - // Set a sync init hook to catch progress changes - starting := make(chan struct{}) - progress := make(chan struct{}) - tester.downloader.syncInitHook = func(origin, latest uint64) { - starting <- struct{}{} - <-progress - } - checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) - - // Create and sync with an attacker that promises a higher chain than available. - attacker := tester.newPeer("attack", protocol, chain.blocks[1:]) - numMissing := 5 - for i := len(chain.blocks) - 2; i > len(chain.blocks)-numMissing; i-- { - attacker.withholdHeaders[chain.blocks[i].Hash()] = struct{}{} - } - pending := new(sync.WaitGroup) - pending.Add(1) - go func() { - defer pending.Done() - if err := tester.sync("attack", nil, mode); err == nil { - panic("succeeded attacker synchronisation") - } - }() - <-starting - checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ - HighestBlock: uint64(len(chain.blocks) - 1), - }) - progress <- struct{}{} - pending.Wait() - afterFailedSync := tester.downloader.Progress() - - // Synchronise with a good peer and check that the progress height has been reduced to - // the true value. - validChain := chain.shorten(len(chain.blocks) - numMissing) - tester.newPeer("valid", protocol, validChain.blocks[1:]) - pending.Add(1) - - go func() { - defer pending.Done() - if err := tester.sync("valid", nil, mode); err != nil { - panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) - } - }() - <-starting - checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{ - CurrentBlock: afterFailedSync.CurrentBlock, - HighestBlock: uint64(len(validChain.blocks) - 1), - }) - // Check final progress after successful sync. - progress <- struct{}{} - pending.Wait() - checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ - CurrentBlock: uint64(len(validChain.blocks) - 1), - HighestBlock: uint64(len(validChain.blocks) - 1), - }) -} - -func TestRemoteHeaderRequestSpan(t *testing.T) { - testCases := []struct { - remoteHeight uint64 - localHeight uint64 - expected []int - }{ - // Remote is way higher. We should ask for the remote head and go backwards - {1500, 1000, - []int{1323, 1339, 1355, 1371, 1387, 1403, 1419, 1435, 1451, 1467, 1483, 1499}, - }, - {15000, 13006, - []int{14823, 14839, 14855, 14871, 14887, 14903, 14919, 14935, 14951, 14967, 14983, 14999}, - }, - // Remote is pretty close to us. We don't have to fetch as many - {1200, 1150, - []int{1149, 1154, 1159, 1164, 1169, 1174, 1179, 1184, 1189, 1194, 1199}, - }, - // Remote is equal to us (so on a fork with higher td) - // We should get the closest couple of ancestors - {1500, 1500, - []int{1497, 1499}, - }, - // We're higher than the remote! Odd - {1000, 1500, - []int{997, 999}, - }, - // Check some weird edgecases that it behaves somewhat rationally - {0, 1500, - []int{0, 2}, - }, - {6000000, 0, - []int{5999823, 5999839, 5999855, 5999871, 5999887, 5999903, 5999919, 5999935, 5999951, 5999967, 5999983, 5999999}, - }, - {0, 0, - []int{0, 2}, - }, - } - reqs := func(from, count, span int) []int { - var r []int - num := from - for len(r) < count { - r = append(r, num) - num += span + 1 - } - return r - } - for i, tt := range testCases { - from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight) - data := reqs(int(from), count, span) - - if max != uint64(data[len(data)-1]) { - t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max) - } - failed := false - if len(data) != len(tt.expected) { - failed = true - t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data)) - } else { - for j, n := range data { - if n != tt.expected[j] { - failed = true - break - } - } - } - if failed { - res := strings.ReplaceAll(fmt.Sprint(data), " ", ",") - exp := strings.ReplaceAll(fmt.Sprint(tt.expected), " ", ",") - t.Logf("got: %v\n", res) - t.Logf("exp: %v\n", exp) - t.Errorf("test %d: wrong values", i) - } - } -} - // Tests that peers below a pre-configured checkpoint block are prevented from // being fast-synced from, avoiding potential cheap eclipse attacks. func TestBeaconSync68Full(t *testing.T) { testBeaconSync(t, eth.ETH68, FullSync) } func TestBeaconSync68Snap(t *testing.T) { testBeaconSync(t, eth.ETH68, SnapSync) } func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) { - //log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) - var cases = []struct { name string // The name of testing scenario local int // The length of local chain(canonical chain assumed), 0 means genesis is the head @@ -1312,81 +698,67 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) { } } -// Tests that synchronisation progress (origin block number and highest block -// number) is tracked and updated correctly in case of manual head reversion -func TestBeaconForkedSyncProgress68Full(t *testing.T) { - testBeaconForkedSyncProgress(t, eth.ETH68, FullSync) -} -func TestBeaconForkedSyncProgress68Snap(t *testing.T) { - testBeaconForkedSyncProgress(t, eth.ETH68, SnapSync) -} -func TestBeaconForkedSyncProgress68Light(t *testing.T) { - testBeaconForkedSyncProgress(t, eth.ETH68, LightSync) -} +// Tests that synchronisation progress (origin block number, current block number +// and highest block number) is tracked and updated correctly. +func TestSyncProgress68Full(t *testing.T) { testSyncProgress(t, eth.ETH68, FullSync) } +func TestSyncProgress68Snap(t *testing.T) { testSyncProgress(t, eth.ETH68, SnapSync) } +func TestSyncProgress68Light(t *testing.T) { testSyncProgress(t, eth.ETH68, LightSync) } -func testBeaconForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { +func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { success := make(chan struct{}) tester := newTesterWithNotification(t, func() { success <- struct{}{} }) defer tester.terminate() - - chainA := testChainForkLightA.shorten(len(testChainBase.blocks) + MaxHeaderFetch) - chainB := testChainForkLightB.shorten(len(testChainBase.blocks) + MaxHeaderFetch) - - // Set a sync init hook to catch progress changes - starting := make(chan struct{}) - progress := make(chan struct{}) - - tester.downloader.syncInitHook = func(origin, latest uint64) { - starting <- struct{}{} - <-progress - } checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) - // Synchronise with one of the forks and check progress - tester.newPeer("fork A", protocol, chainA.blocks[1:]) - pending := new(sync.WaitGroup) - pending.Add(1) - go func() { - defer pending.Done() - if err := tester.downloader.BeaconSync(mode, chainA.blocks[len(chainA.blocks)-1].Header(), nil); err != nil { - panic(fmt.Sprintf("failed to beacon sync: %v", err)) - } - }() + chain := testChainBase.shorten(blockCacheMaxItems - 15) + shortChain := chain.shorten(len(chain.blocks) / 2).blocks[1:] - <-starting - progress <- struct{}{} + // Connect to peer that provides all headers and part of the bodies + faultyPeer := tester.newPeer("peer-half", protocol, shortChain) + for _, header := range shortChain { + faultyPeer.withholdBodies[header.Hash()] = struct{}{} + } + + if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)/2-1].Header(), nil); err != nil { + t.Fatalf("failed to beacon-sync chain: %v", err) + } select { case <-success: - checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ - HighestBlock: uint64(len(chainA.blocks) - 1), - CurrentBlock: uint64(len(chainA.blocks) - 1), + // Ok, downloader fully cancelled after sync cycle + checkProgress(t, tester.downloader, "peer-half", ethereum.SyncProgress{ + CurrentBlock: uint64(len(chain.blocks)/2 - 1), + HighestBlock: uint64(len(chain.blocks)/2 - 1), }) case <-time.NewTimer(time.Second * 3).C: t.Fatalf("Failed to sync chain in three seconds") } - // Set the head to a second fork - tester.newPeer("fork B", protocol, chainB.blocks[1:]) - pending.Add(1) - go func() { - defer pending.Done() - if err := tester.downloader.BeaconSync(mode, chainB.blocks[len(chainB.blocks)-1].Header(), nil); err != nil { - panic(fmt.Sprintf("failed to beacon sync: %v", err)) - } - }() + // Synchronise all the blocks and check continuation progress + tester.newPeer("peer-full", protocol, chain.blocks[1:]) + if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { + t.Fatalf("failed to beacon-sync chain: %v", err) + } + var startingBlock uint64 + if mode == LightSync { + // in light-sync mode: + // * the starting block is 0 on the second sync cycle because blocks + // are never downloaded. + // * The current/highest blocks reported in the progress reflect the + // current/highest header. + startingBlock = 0 + } else { + startingBlock = uint64(len(chain.blocks)/2 - 1) + } - <-starting - progress <- struct{}{} - - // reorg below available state causes the state sync to rewind to genesis select { case <-success: - checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ - HighestBlock: uint64(len(chainB.blocks) - 1), - CurrentBlock: uint64(len(chainB.blocks) - 1), - StartingBlock: 0, + // Ok, downloader fully cancelled after sync cycle + checkProgress(t, tester.downloader, "peer-full", ethereum.SyncProgress{ + StartingBlock: startingBlock, + CurrentBlock: uint64(len(chain.blocks) - 1), + HighestBlock: uint64(len(chain.blocks) - 1), }) case <-time.NewTimer(time.Second * 3).C: t.Fatalf("Failed to sync chain in three seconds") diff --git a/eth/downloader/fetchers.go b/eth/downloader/fetchers.go index cc4279b0da..4ebb9bbc98 100644 --- a/eth/downloader/fetchers.go +++ b/eth/downloader/fetchers.go @@ -68,48 +68,3 @@ func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amo return *res.Res.(*eth.BlockHeadersRequest), res.Meta.([]common.Hash), nil } } - -// fetchHeadersByNumber is a blocking version of Peer.RequestHeadersByNumber which -// handles all the cancellation, interruption and timeout mechanisms of a data -// retrieval to allow blocking API calls. -func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error) { - // Create the response sink and send the network request - start := time.Now() - resCh := make(chan *eth.Response) - - req, err := p.peer.RequestHeadersByNumber(number, amount, skip, reverse, resCh) - if err != nil { - return nil, nil, err - } - defer req.Close() - - // Wait until the response arrives, the request is cancelled or times out - ttl := d.peers.rates.TargetTimeout() - - timeoutTimer := time.NewTimer(ttl) - defer timeoutTimer.Stop() - - select { - case <-d.cancelCh: - return nil, nil, errCanceled - - case <-timeoutTimer.C: - // Header retrieval timed out, update the metrics - p.log.Debug("Header request timed out", "elapsed", ttl) - headerTimeoutMeter.Mark(1) - - return nil, nil, errTimeout - - case res := <-resCh: - // Headers successfully retrieved, update the metrics - headerReqTimer.Update(time.Since(start)) - headerInMeter.Mark(int64(len(*res.Res.(*eth.BlockHeadersRequest)))) - - // Don't reject the packet even if it turns out to be bad, downloader will - // disconnect the peer on its own terms. Simply delivery the headers to - // be processed by the caller - res.Done <- nil - - return *res.Res.(*eth.BlockHeadersRequest), res.Meta.([]common.Hash), nil - } -} diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go index 649aa27615..9d8cd114c1 100644 --- a/eth/downloader/fetchers_concurrent.go +++ b/eth/downloader/fetchers_concurrent.go @@ -76,7 +76,7 @@ type typedQueue interface { // concurrentFetch iteratively downloads scheduled block parts, taking available // peers, reserving a chunk of fetch requests for each and waiting for delivery // or timeouts. -func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { +func (d *Downloader) concurrentFetch(queue typedQueue) error { // Create a delivery channel to accept responses from all peers responses := make(chan *eth.Response) @@ -126,10 +126,6 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // Prepare the queue and fetch block parts until the block header fetcher's done finished := false for { - // Short circuit if we lost all our peers - if d.peers.Len() == 0 && !beaconMode { - return errNoPeers - } // If there's nothing more to fetch, wait or terminate if queue.pending() == 0 { if len(pending) == 0 && finished { @@ -158,27 +154,20 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { } sort.Sort(&peerCapacitySort{idles, caps}) - var ( - progressed bool - throttled bool - queued = queue.pending() - ) + var throttled bool for _, peer := range idles { // Short circuit if throttling activated or there are no more // queued tasks to be retrieved if throttled { break } - if queued = queue.pending(); queued == 0 { + if queued := queue.pending(); queued == 0 { break } // Reserve a chunk of fetches for a peer. A nil can mean either that // no more headers are available, or that the peer is known not to // have them. - request, progress, throttle := queue.reserve(peer, queue.capacity(peer, d.peers.rates.TargetRoundTrip())) - if progress { - progressed = true - } + request, _, throttle := queue.reserve(peer, queue.capacity(peer, d.peers.rates.TargetRoundTrip())) if throttle { throttled = true throttleCounter.Inc(1) @@ -207,11 +196,6 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { timeout.Reset(ttl) } } - // Make sure that we have peers available for fetching. If all peers have been tried - // and all failed throw an error - if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode { - return errPeersUnavailable - } } // Wait for something to happen select { @@ -315,16 +299,6 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { queue.updateCapacity(peer, 0, 0) } else { d.dropPeer(peer.id) - - // If this peer was the master peer, abort sync immediately - d.cancelLock.RLock() - master := peer.id == d.cancelPeer - d.cancelLock.RUnlock() - - if master { - d.cancel() - return errTimeout - } } case res := <-responses: diff --git a/eth/downloader/fetchers_concurrent_bodies.go b/eth/downloader/fetchers_concurrent_bodies.go index 5105fda66b..56359b33c9 100644 --- a/eth/downloader/fetchers_concurrent_bodies.go +++ b/eth/downloader/fetchers_concurrent_bodies.go @@ -78,7 +78,6 @@ func (q *bodyQueue) request(peer *peerConnection, req *fetchRequest, resCh chan if q.bodyFetchHook != nil { q.bodyFetchHook(req.Headers) } - hashes := make([]common.Hash, 0, len(req.Headers)) for _, header := range req.Headers { hashes = append(hashes, header.Hash()) diff --git a/eth/downloader/fetchers_concurrent_headers.go b/eth/downloader/fetchers_concurrent_headers.go deleted file mode 100644 index 8201f4ca74..0000000000 --- a/eth/downloader/fetchers_concurrent_headers.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2021 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package downloader - -import ( - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/eth/protocols/eth" - "github.com/ethereum/go-ethereum/log" -) - -// headerQueue implements typedQueue and is a type adapter between the generic -// concurrent fetcher and the downloader. -type headerQueue Downloader - -// waker returns a notification channel that gets pinged in case more header -// fetches have been queued up, so the fetcher might assign it to idle peers. -func (q *headerQueue) waker() chan bool { - return q.queue.headerContCh -} - -// pending returns the number of headers that are currently queued for fetching -// by the concurrent downloader. -func (q *headerQueue) pending() int { - return q.queue.PendingHeaders() -} - -// capacity is responsible for calculating how many headers a particular peer is -// estimated to be able to retrieve within the allotted round trip time. -func (q *headerQueue) capacity(peer *peerConnection, rtt time.Duration) int { - return peer.HeaderCapacity(rtt) -} - -// updateCapacity is responsible for updating how many headers a particular peer -// is estimated to be able to retrieve in a unit time. -func (q *headerQueue) updateCapacity(peer *peerConnection, items int, span time.Duration) { - peer.UpdateHeaderRate(items, span) -} - -// reserve is responsible for allocating a requested number of pending headers -// from the download queue to the specified peer. -func (q *headerQueue) reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool) { - return q.queue.ReserveHeaders(peer, items), false, false -} - -// unreserve is responsible for removing the current header retrieval allocation -// assigned to a specific peer and placing it back into the pool to allow -// reassigning to some other peer. -func (q *headerQueue) unreserve(peer string) int { - fails := q.queue.ExpireHeaders(peer) - if fails > 2 { - log.Trace("Header delivery timed out", "peer", peer) - } else { - log.Debug("Header delivery stalling", "peer", peer) - } - return fails -} - -// request is responsible for converting a generic fetch request into a header -// one and sending it to the remote peer for fulfillment. -func (q *headerQueue) request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error) { - peer.log.Trace("Requesting new batch of headers", "from", req.From) - return peer.peer.RequestHeadersByNumber(req.From, MaxHeaderFetch, 0, false, resCh) -} - -// deliver is responsible for taking a generic response packet from the concurrent -// fetcher, unpacking the header data and delivering it to the downloader's queue. -func (q *headerQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { - headers := *packet.Res.(*eth.BlockHeadersRequest) - hashes := packet.Meta.([]common.Hash) - - accepted, err := q.queue.DeliverHeaders(peer.id, headers, hashes, q.headerProcCh) - switch { - case err == nil && len(headers) == 0: - peer.log.Trace("Requested headers delivered") - case err == nil: - peer.log.Trace("Delivered new batch of headers", "count", len(headers), "accepted", accepted) - default: - peer.log.Debug("Failed to deliver retrieved headers", "err", err) - } - return accepted, err -} diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index 46f3febd8b..6043f51372 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -58,7 +58,6 @@ var pregenerated bool func init() { // Reduce some of the parameters to make the tester faster fullMaxForkAncestry = 10000 - lightMaxForkAncestry = 10000 blockCacheMaxItems = 1024 fsHeaderSafetyNet = 256 fsHeaderContCheck = 500 * time.Millisecond