2ce00adb55
* focus on performance improvement in many aspects. 1. Do BlockBody verification concurrently; 2. Do calculation of intermediate root concurrently; 3. Preload accounts before processing blocks; 4. Make the snapshot layers configurable. 5. Reuse some object to reduce GC. add * rlp: improve decoder stream implementation (#22858) This commit makes various cleanup changes to rlp.Stream. * rlp: shrink Stream struct This removes a lot of unused padding space in Stream by reordering the fields. The size of Stream changes from 120 bytes to 88 bytes. Stream instances are internally cached and reused using sync.Pool, so this does not improve performance. * rlp: simplify list stack The list stack kept track of the size of the current list context as well as the current offset into it. The size had to be stored in the stack in order to subtract it from the remaining bytes of any enclosing list in ListEnd. It seems that this can be implemented in a simpler way: just subtract the size from the enclosing list context in List instead. * rlp: use atomic.Value for type cache (#22902) All encoding/decoding operations read the type cache to find the writer/decoder function responsible for a type. When analyzing CPU profiles of geth during sync, I found that the use of sync.RWMutex in cache lookups appears in the profiles. It seems we are running into CPU cache contention problems when package rlp is heavily used on all CPU cores during sync. This change makes it use atomic.Value + a writer lock instead of sync.RWMutex. In the common case where the typeinfo entry is present in the cache, we simply fetch the map and lookup the type. * rlp: optimize byte array handling (#22924) This change improves the performance of encoding/decoding [N]byte. name old time/op new time/op delta DecodeByteArrayStruct-8 336ns ± 0% 246ns ± 0% -26.98% (p=0.000 n=9+10) EncodeByteArrayStruct-8 225ns ± 1% 148ns ± 1% -34.12% (p=0.000 n=10+10) name old alloc/op new alloc/op delta DecodeByteArrayStruct-8 120B ± 0% 48B ± 0% -60.00% (p=0.000 n=10+10) EncodeByteArrayStruct-8 0.00B 0.00B ~ (all equal) * rlp: optimize big.Int decoding for size <= 32 bytes (#22927) This change grows the static integer buffer in Stream to 32 bytes, making it possible to decode 256bit integers without allocating a temporary buffer. In the recent commit 088da24, Stream struct size decreased from 120 bytes down to 88 bytes. This commit grows the struct to 112 bytes again, but the size change will not degrade performance because Stream instances are internally cached in sync.Pool. name old time/op new time/op delta DecodeBigInts-8 12.2µs ± 0% 8.6µs ± 4% -29.58% (p=0.000 n=9+10) name old speed new speed delta DecodeBigInts-8 230MB/s ± 0% 326MB/s ± 4% +42.04% (p=0.000 n=9+10) * eth/protocols/eth, les: avoid Raw() when decoding HashOrNumber (#22841) Getting the raw value is not necessary to decode this type, and decoding it directly from the stream is faster. * fix testcase * debug no lazy * fix can not repair * address comments Co-authored-by: Felix Lange <fjl@twurst.com>
887 lines
32 KiB
Go
887 lines
32 KiB
Go
// Copyright 2015 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// Package fetcher contains the announcement based header, blocks or transaction synchronisation.
|
|
package fetcher
|
|
|
|
import (
|
|
"errors"
|
|
"math/rand"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/common/gopool"
|
|
"github.com/ethereum/go-ethereum/common/prque"
|
|
"github.com/ethereum/go-ethereum/consensus"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/trie"
|
|
)
|
|
|
|
const (
|
|
lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
|
|
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
|
|
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
|
|
fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
|
|
)
|
|
|
|
const (
|
|
maxUncleDist = 11 // Maximum allowed backward distance from the chain head
|
|
maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
|
|
hashLimit = 256 // Maximum number of unique blocks or headers a peer may have announced
|
|
blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
|
|
)
|
|
|
|
var (
|
|
blockAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/in", nil)
|
|
blockAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/announces/out", nil)
|
|
blockAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/drop", nil)
|
|
blockAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/dos", nil)
|
|
|
|
blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/in", nil)
|
|
blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/broadcasts/out", nil)
|
|
blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/drop", nil)
|
|
blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/dos", nil)
|
|
|
|
headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/headers", nil)
|
|
bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/bodies", nil)
|
|
|
|
headerFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/in", nil)
|
|
headerFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/out", nil)
|
|
bodyFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/in", nil)
|
|
bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/out", nil)
|
|
)
|
|
|
|
var errTerminated = errors.New("terminated")
|
|
|
|
// HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
|
|
type HeaderRetrievalFn func(common.Hash) *types.Header
|
|
|
|
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
|
|
type blockRetrievalFn func(common.Hash) *types.Block
|
|
|
|
// headerRequesterFn is a callback type for sending a header retrieval request.
|
|
type headerRequesterFn func(common.Hash) error
|
|
|
|
// bodyRequesterFn is a callback type for sending a body retrieval request.
|
|
type bodyRequesterFn func([]common.Hash) error
|
|
|
|
// headerVerifierFn is a callback type to verify a block's header for fast propagation.
|
|
type headerVerifierFn func(header *types.Header) error
|
|
|
|
// blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
|
|
type blockBroadcasterFn func(block *types.Block, propagate bool)
|
|
|
|
// chainHeightFn is a callback type to retrieve the current chain height.
|
|
type chainHeightFn func() uint64
|
|
|
|
// headersInsertFn is a callback type to insert a batch of headers into the local chain.
|
|
type headersInsertFn func(headers []*types.Header) (int, error)
|
|
|
|
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
|
|
type chainInsertFn func(types.Blocks) (int, error)
|
|
|
|
// peerDropFn is a callback type for dropping a peer detected as malicious.
|
|
type peerDropFn func(id string)
|
|
|
|
// blockAnnounce is the hash notification of the availability of a new block in the
|
|
// network.
|
|
type blockAnnounce struct {
|
|
hash common.Hash // Hash of the block being announced
|
|
number uint64 // Number of the block being announced (0 = unknown | old protocol)
|
|
header *types.Header // Header of the block partially reassembled (new protocol)
|
|
time time.Time // Timestamp of the announcement
|
|
|
|
origin string // Identifier of the peer originating the notification
|
|
|
|
fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
|
|
fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
|
|
}
|
|
|
|
// headerFilterTask represents a batch of headers needing fetcher filtering.
|
|
type headerFilterTask struct {
|
|
peer string // The source peer of block headers
|
|
headers []*types.Header // Collection of headers to filter
|
|
time time.Time // Arrival time of the headers
|
|
}
|
|
|
|
// bodyFilterTask represents a batch of block bodies (transactions and uncles)
|
|
// needing fetcher filtering.
|
|
type bodyFilterTask struct {
|
|
peer string // The source peer of block bodies
|
|
transactions [][]*types.Transaction // Collection of transactions per block bodies
|
|
uncles [][]*types.Header // Collection of uncles per block bodies
|
|
time time.Time // Arrival time of the blocks' contents
|
|
}
|
|
|
|
// blockOrHeaderInject represents a schedules import operation.
|
|
type blockOrHeaderInject struct {
|
|
origin string
|
|
|
|
header *types.Header // Used for light mode fetcher which only cares about header.
|
|
block *types.Block // Used for normal mode fetcher which imports full block.
|
|
}
|
|
|
|
// number returns the block number of the injected object.
|
|
func (inject *blockOrHeaderInject) number() uint64 {
|
|
if inject.header != nil {
|
|
return inject.header.Number.Uint64()
|
|
}
|
|
return inject.block.NumberU64()
|
|
}
|
|
|
|
// number returns the block hash of the injected object.
|
|
func (inject *blockOrHeaderInject) hash() common.Hash {
|
|
if inject.header != nil {
|
|
return inject.header.Hash()
|
|
}
|
|
return inject.block.Hash()
|
|
}
|
|
|
|
// BlockFetcher is responsible for accumulating block announcements from various peers
|
|
// and scheduling them for retrieval.
|
|
type BlockFetcher struct {
|
|
light bool // The indicator whether it's a light fetcher or normal one.
|
|
|
|
// Various event channels
|
|
notify chan *blockAnnounce
|
|
inject chan *blockOrHeaderInject
|
|
|
|
headerFilter chan chan *headerFilterTask
|
|
bodyFilter chan chan *bodyFilterTask
|
|
|
|
done chan common.Hash
|
|
quit chan struct{}
|
|
|
|
// Announce states
|
|
announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion
|
|
announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching
|
|
fetching map[common.Hash]*blockAnnounce // Announced blocks, currently fetching
|
|
fetched map[common.Hash][]*blockAnnounce // Blocks with headers fetched, scheduled for body retrieval
|
|
completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
|
|
|
|
// Block cache
|
|
queue *prque.Prque // Queue containing the import operations (block number sorted)
|
|
queues map[string]int // Per peer block counts to prevent memory exhaustion
|
|
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
|
|
|
|
// Callbacks
|
|
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
|
|
getBlock blockRetrievalFn // Retrieves a block from the local chain
|
|
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
|
|
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
|
|
chainHeight chainHeightFn // Retrieves the current chain's height
|
|
insertHeaders headersInsertFn // Injects a batch of headers into the chain
|
|
insertChain chainInsertFn // Injects a batch of blocks into the chain
|
|
dropPeer peerDropFn // Drops a peer for misbehaving
|
|
|
|
// Testing hooks
|
|
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
|
|
queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
|
|
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
|
|
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
|
|
importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62)
|
|
}
|
|
|
|
// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
|
|
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
|
|
return &BlockFetcher{
|
|
light: light,
|
|
notify: make(chan *blockAnnounce),
|
|
inject: make(chan *blockOrHeaderInject),
|
|
headerFilter: make(chan chan *headerFilterTask),
|
|
bodyFilter: make(chan chan *bodyFilterTask),
|
|
done: make(chan common.Hash),
|
|
quit: make(chan struct{}),
|
|
announces: make(map[string]int),
|
|
announced: make(map[common.Hash][]*blockAnnounce),
|
|
fetching: make(map[common.Hash]*blockAnnounce),
|
|
fetched: make(map[common.Hash][]*blockAnnounce),
|
|
completing: make(map[common.Hash]*blockAnnounce),
|
|
queue: prque.New(nil),
|
|
queues: make(map[string]int),
|
|
queued: make(map[common.Hash]*blockOrHeaderInject),
|
|
getHeader: getHeader,
|
|
getBlock: getBlock,
|
|
verifyHeader: verifyHeader,
|
|
broadcastBlock: broadcastBlock,
|
|
chainHeight: chainHeight,
|
|
insertHeaders: insertHeaders,
|
|
insertChain: insertChain,
|
|
dropPeer: dropPeer,
|
|
}
|
|
}
|
|
|
|
// Start boots up the announcement based synchroniser, accepting and processing
|
|
// hash notifications and block fetches until termination requested.
|
|
func (f *BlockFetcher) Start() {
|
|
go f.loop()
|
|
}
|
|
|
|
// Stop terminates the announcement based synchroniser, canceling all pending
|
|
// operations.
|
|
func (f *BlockFetcher) Stop() {
|
|
close(f.quit)
|
|
}
|
|
|
|
// Notify announces the fetcher of the potential availability of a new block in
|
|
// the network.
|
|
func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
|
|
headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
|
|
block := &blockAnnounce{
|
|
hash: hash,
|
|
number: number,
|
|
time: time,
|
|
origin: peer,
|
|
fetchHeader: headerFetcher,
|
|
fetchBodies: bodyFetcher,
|
|
}
|
|
select {
|
|
case f.notify <- block:
|
|
return nil
|
|
case <-f.quit:
|
|
return errTerminated
|
|
}
|
|
}
|
|
|
|
// Enqueue tries to fill gaps the fetcher's future import queue.
|
|
func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {
|
|
op := &blockOrHeaderInject{
|
|
origin: peer,
|
|
block: block,
|
|
}
|
|
select {
|
|
case f.inject <- op:
|
|
return nil
|
|
case <-f.quit:
|
|
return errTerminated
|
|
}
|
|
}
|
|
|
|
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
|
|
// returning those that should be handled differently.
|
|
func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
|
|
log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
|
|
|
|
// Send the filter channel to the fetcher
|
|
filter := make(chan *headerFilterTask)
|
|
|
|
select {
|
|
case f.headerFilter <- filter:
|
|
case <-f.quit:
|
|
return nil
|
|
}
|
|
// Request the filtering of the header list
|
|
select {
|
|
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
|
|
case <-f.quit:
|
|
return nil
|
|
}
|
|
// Retrieve the headers remaining after filtering
|
|
select {
|
|
case task := <-filter:
|
|
return task.headers
|
|
case <-f.quit:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// FilterBodies extracts all the block bodies that were explicitly requested by
|
|
// the fetcher, returning those that should be handled differently.
|
|
func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
|
|
log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
|
|
|
|
// Send the filter channel to the fetcher
|
|
filter := make(chan *bodyFilterTask)
|
|
|
|
select {
|
|
case f.bodyFilter <- filter:
|
|
case <-f.quit:
|
|
return nil, nil
|
|
}
|
|
// Request the filtering of the body list
|
|
select {
|
|
case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
|
|
case <-f.quit:
|
|
return nil, nil
|
|
}
|
|
// Retrieve the bodies remaining after filtering
|
|
select {
|
|
case task := <-filter:
|
|
return task.transactions, task.uncles
|
|
case <-f.quit:
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
// Loop is the main fetcher loop, checking and processing various notification
|
|
// events.
|
|
func (f *BlockFetcher) loop() {
|
|
// Iterate the block fetching until a quit is requested
|
|
var (
|
|
fetchTimer = time.NewTimer(0)
|
|
completeTimer = time.NewTimer(0)
|
|
)
|
|
<-fetchTimer.C // clear out the channel
|
|
<-completeTimer.C
|
|
defer fetchTimer.Stop()
|
|
defer completeTimer.Stop()
|
|
|
|
for {
|
|
// Clean up any expired block fetches
|
|
for hash, announce := range f.fetching {
|
|
if time.Since(announce.time) > fetchTimeout {
|
|
f.forgetHash(hash)
|
|
}
|
|
}
|
|
// Import any queued blocks that could potentially fit
|
|
height := f.chainHeight()
|
|
for !f.queue.Empty() {
|
|
op := f.queue.PopItem().(*blockOrHeaderInject)
|
|
hash := op.hash()
|
|
if f.queueChangeHook != nil {
|
|
f.queueChangeHook(hash, false)
|
|
}
|
|
// If too high up the chain or phase, continue later
|
|
number := op.number()
|
|
if number > height+1 {
|
|
f.queue.Push(op, -int64(number))
|
|
if f.queueChangeHook != nil {
|
|
f.queueChangeHook(hash, true)
|
|
}
|
|
break
|
|
}
|
|
// Otherwise if fresh and still unknown, try and import
|
|
if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
|
|
f.forgetBlock(hash)
|
|
continue
|
|
}
|
|
if f.light {
|
|
f.importHeaders(op.origin, op.header)
|
|
} else {
|
|
f.importBlocks(op.origin, op.block)
|
|
}
|
|
}
|
|
// Wait for an outside event to occur
|
|
select {
|
|
case <-f.quit:
|
|
// BlockFetcher terminating, abort all operations
|
|
return
|
|
|
|
case notification := <-f.notify:
|
|
// A block was announced, make sure the peer isn't DOSing us
|
|
blockAnnounceInMeter.Mark(1)
|
|
|
|
count := f.announces[notification.origin] + 1
|
|
if count > hashLimit {
|
|
log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
|
|
blockAnnounceDOSMeter.Mark(1)
|
|
break
|
|
}
|
|
// If we have a valid block number, check that it's potentially useful
|
|
if notification.number > 0 {
|
|
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
|
|
log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
|
|
blockAnnounceDropMeter.Mark(1)
|
|
break
|
|
}
|
|
}
|
|
// All is well, schedule the announce if block's not yet downloading
|
|
if _, ok := f.fetching[notification.hash]; ok {
|
|
break
|
|
}
|
|
if _, ok := f.completing[notification.hash]; ok {
|
|
break
|
|
}
|
|
f.announces[notification.origin] = count
|
|
f.announced[notification.hash] = append(f.announced[notification.hash], notification)
|
|
if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
|
|
f.announceChangeHook(notification.hash, true)
|
|
}
|
|
if len(f.announced) == 1 {
|
|
f.rescheduleFetch(fetchTimer)
|
|
}
|
|
|
|
case op := <-f.inject:
|
|
// A direct block insertion was requested, try and fill any pending gaps
|
|
blockBroadcastInMeter.Mark(1)
|
|
|
|
// Now only direct block injection is allowed, drop the header injection
|
|
// here silently if we receive.
|
|
if f.light {
|
|
continue
|
|
}
|
|
f.enqueue(op.origin, nil, op.block)
|
|
|
|
case hash := <-f.done:
|
|
// A pending import finished, remove all traces of the notification
|
|
f.forgetHash(hash)
|
|
f.forgetBlock(hash)
|
|
|
|
case <-fetchTimer.C:
|
|
// At least one block's timer ran out, check for needing retrieval
|
|
request := make(map[string][]common.Hash)
|
|
|
|
for hash, announces := range f.announced {
|
|
// In current LES protocol(les2/les3), only header announce is
|
|
// available, no need to wait too much time for header broadcast.
|
|
timeout := arriveTimeout - gatherSlack
|
|
if f.light {
|
|
timeout = 0
|
|
}
|
|
if time.Since(announces[0].time) > timeout {
|
|
// Pick a random peer to retrieve from, reset all others
|
|
announce := announces[rand.Intn(len(announces))]
|
|
f.forgetHash(hash)
|
|
|
|
// If the block still didn't arrive, queue for fetching
|
|
if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) {
|
|
request[announce.origin] = append(request[announce.origin], hash)
|
|
f.fetching[hash] = announce
|
|
}
|
|
}
|
|
}
|
|
// Send out all block header requests
|
|
for peer, hashes := range request {
|
|
log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
|
|
|
|
// Create a closure of the fetch and schedule in on a new thread
|
|
fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
|
|
gopool.Submit(func() {
|
|
if f.fetchingHook != nil {
|
|
f.fetchingHook(hashes)
|
|
}
|
|
for _, hash := range hashes {
|
|
headerFetchMeter.Mark(1)
|
|
fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
|
|
}
|
|
})
|
|
}
|
|
// Schedule the next fetch if blocks are still pending
|
|
f.rescheduleFetch(fetchTimer)
|
|
|
|
case <-completeTimer.C:
|
|
// At least one header's timer ran out, retrieve everything
|
|
request := make(map[string][]common.Hash)
|
|
|
|
for hash, announces := range f.fetched {
|
|
// Pick a random peer to retrieve from, reset all others
|
|
announce := announces[rand.Intn(len(announces))]
|
|
f.forgetHash(hash)
|
|
|
|
// If the block still didn't arrive, queue for completion
|
|
if f.getBlock(hash) == nil {
|
|
request[announce.origin] = append(request[announce.origin], hash)
|
|
f.completing[hash] = announce
|
|
}
|
|
}
|
|
// Send out all block body requests
|
|
for peer, hashes := range request {
|
|
log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
|
|
|
|
// Create a closure of the fetch and schedule in on a new thread
|
|
if f.completingHook != nil {
|
|
f.completingHook(hashes)
|
|
}
|
|
bodyFetchMeter.Mark(int64(len(hashes)))
|
|
go f.completing[hashes[0]].fetchBodies(hashes)
|
|
}
|
|
// Schedule the next fetch if blocks are still pending
|
|
f.rescheduleComplete(completeTimer)
|
|
|
|
case filter := <-f.headerFilter:
|
|
// Headers arrived from a remote peer. Extract those that were explicitly
|
|
// requested by the fetcher, and return everything else so it's delivered
|
|
// to other parts of the system.
|
|
var task *headerFilterTask
|
|
select {
|
|
case task = <-filter:
|
|
case <-f.quit:
|
|
return
|
|
}
|
|
headerFilterInMeter.Mark(int64(len(task.headers)))
|
|
|
|
// Split the batch of headers into unknown ones (to return to the caller),
|
|
// known incomplete ones (requiring body retrievals) and completed blocks.
|
|
unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{}
|
|
for _, header := range task.headers {
|
|
hash := header.Hash()
|
|
|
|
// Filter fetcher-requested headers from other synchronisation algorithms
|
|
if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
|
|
// If the delivered header does not match the promised number, drop the announcer
|
|
if header.Number.Uint64() != announce.number {
|
|
log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
|
|
f.dropPeer(announce.origin)
|
|
f.forgetHash(hash)
|
|
continue
|
|
}
|
|
// Collect all headers only if we are running in light
|
|
// mode and the headers are not imported by other means.
|
|
if f.light {
|
|
if f.getHeader(hash) == nil {
|
|
announce.header = header
|
|
lightHeaders = append(lightHeaders, announce)
|
|
}
|
|
f.forgetHash(hash)
|
|
continue
|
|
}
|
|
// Only keep if not imported by other means
|
|
if f.getBlock(hash) == nil {
|
|
announce.header = header
|
|
announce.time = task.time
|
|
|
|
// If the block is empty (header only), short circuit into the final import queue
|
|
if header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash {
|
|
log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
|
|
|
|
block := types.NewBlockWithHeader(header)
|
|
block.ReceivedAt = task.time
|
|
|
|
complete = append(complete, block)
|
|
f.completing[hash] = announce
|
|
continue
|
|
}
|
|
// Otherwise add to the list of blocks needing completion
|
|
incomplete = append(incomplete, announce)
|
|
} else {
|
|
log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
|
|
f.forgetHash(hash)
|
|
}
|
|
} else {
|
|
// BlockFetcher doesn't know about it, add to the return list
|
|
unknown = append(unknown, header)
|
|
}
|
|
}
|
|
headerFilterOutMeter.Mark(int64(len(unknown)))
|
|
select {
|
|
case filter <- &headerFilterTask{headers: unknown, time: task.time}:
|
|
case <-f.quit:
|
|
return
|
|
}
|
|
// Schedule the retrieved headers for body completion
|
|
for _, announce := range incomplete {
|
|
hash := announce.header.Hash()
|
|
if _, ok := f.completing[hash]; ok {
|
|
continue
|
|
}
|
|
f.fetched[hash] = append(f.fetched[hash], announce)
|
|
if len(f.fetched) == 1 {
|
|
f.rescheduleComplete(completeTimer)
|
|
}
|
|
}
|
|
// Schedule the header for light fetcher import
|
|
for _, announce := range lightHeaders {
|
|
f.enqueue(announce.origin, announce.header, nil)
|
|
}
|
|
// Schedule the header-only blocks for import
|
|
for _, block := range complete {
|
|
if announce := f.completing[block.Hash()]; announce != nil {
|
|
f.enqueue(announce.origin, nil, block)
|
|
}
|
|
}
|
|
|
|
case filter := <-f.bodyFilter:
|
|
// Block bodies arrived, extract any explicitly requested blocks, return the rest
|
|
var task *bodyFilterTask
|
|
select {
|
|
case task = <-filter:
|
|
case <-f.quit:
|
|
return
|
|
}
|
|
bodyFilterInMeter.Mark(int64(len(task.transactions)))
|
|
blocks := []*types.Block{}
|
|
// abort early if there's nothing explicitly requested
|
|
if len(f.completing) > 0 {
|
|
for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
|
|
// Match up a body to any possible completion request
|
|
var (
|
|
matched = false
|
|
uncleHash common.Hash // calculated lazily and reused
|
|
txnHash common.Hash // calculated lazily and reused
|
|
)
|
|
for hash, announce := range f.completing {
|
|
if f.queued[hash] != nil || announce.origin != task.peer {
|
|
continue
|
|
}
|
|
if uncleHash == (common.Hash{}) {
|
|
uncleHash = types.CalcUncleHash(task.uncles[i])
|
|
}
|
|
if uncleHash != announce.header.UncleHash {
|
|
continue
|
|
}
|
|
if txnHash == (common.Hash{}) {
|
|
txnHash = types.DeriveSha(types.Transactions(task.transactions[i]), trie.NewStackTrie(nil))
|
|
}
|
|
if txnHash != announce.header.TxHash {
|
|
continue
|
|
}
|
|
// Mark the body matched, reassemble if still unknown
|
|
matched = true
|
|
if f.getBlock(hash) == nil {
|
|
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
|
|
block.ReceivedAt = task.time
|
|
blocks = append(blocks, block)
|
|
} else {
|
|
f.forgetHash(hash)
|
|
}
|
|
|
|
}
|
|
if matched {
|
|
task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
|
|
task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
|
|
i--
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
bodyFilterOutMeter.Mark(int64(len(task.transactions)))
|
|
select {
|
|
case filter <- task:
|
|
case <-f.quit:
|
|
return
|
|
}
|
|
// Schedule the retrieved blocks for ordered import
|
|
for _, block := range blocks {
|
|
if announce := f.completing[block.Hash()]; announce != nil {
|
|
f.enqueue(announce.origin, nil, block)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout.
|
|
func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
|
|
// Short circuit if no blocks are announced
|
|
if len(f.announced) == 0 {
|
|
return
|
|
}
|
|
// Schedule announcement retrieval quickly for light mode
|
|
// since server won't send any headers to client.
|
|
if f.light {
|
|
fetch.Reset(lightTimeout)
|
|
return
|
|
}
|
|
// Otherwise find the earliest expiring announcement
|
|
earliest := time.Now()
|
|
for _, announces := range f.announced {
|
|
if earliest.After(announces[0].time) {
|
|
earliest = announces[0].time
|
|
}
|
|
}
|
|
fetch.Reset(arriveTimeout - time.Since(earliest))
|
|
}
|
|
|
|
// rescheduleComplete resets the specified completion timer to the next fetch timeout.
|
|
func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) {
|
|
// Short circuit if no headers are fetched
|
|
if len(f.fetched) == 0 {
|
|
return
|
|
}
|
|
// Otherwise find the earliest expiring announcement
|
|
earliest := time.Now()
|
|
for _, announces := range f.fetched {
|
|
if earliest.After(announces[0].time) {
|
|
earliest = announces[0].time
|
|
}
|
|
}
|
|
complete.Reset(gatherSlack - time.Since(earliest))
|
|
}
|
|
|
|
// enqueue schedules a new header or block import operation, if the component
|
|
// to be imported has not yet been seen.
|
|
func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.Block) {
|
|
var (
|
|
hash common.Hash
|
|
number uint64
|
|
)
|
|
if header != nil {
|
|
hash, number = header.Hash(), header.Number.Uint64()
|
|
} else {
|
|
hash, number = block.Hash(), block.NumberU64()
|
|
}
|
|
// Ensure the peer isn't DOSing us
|
|
count := f.queues[peer] + 1
|
|
if count > blockLimit {
|
|
log.Debug("Discarded delivered header or block, exceeded allowance", "peer", peer, "number", number, "hash", hash, "limit", blockLimit)
|
|
blockBroadcastDOSMeter.Mark(1)
|
|
f.forgetHash(hash)
|
|
return
|
|
}
|
|
// Discard any past or too distant blocks
|
|
if dist := int64(number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
|
|
log.Debug("Discarded delivered header or block, too far away", "peer", peer, "number", number, "hash", hash, "distance", dist)
|
|
blockBroadcastDropMeter.Mark(1)
|
|
f.forgetHash(hash)
|
|
return
|
|
}
|
|
// Schedule the block for future importing
|
|
if _, ok := f.queued[hash]; !ok {
|
|
op := &blockOrHeaderInject{origin: peer}
|
|
if header != nil {
|
|
op.header = header
|
|
} else {
|
|
op.block = block
|
|
}
|
|
f.queues[peer] = count
|
|
f.queued[hash] = op
|
|
f.queue.Push(op, -int64(number))
|
|
if f.queueChangeHook != nil {
|
|
f.queueChangeHook(hash, true)
|
|
}
|
|
log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size())
|
|
}
|
|
}
|
|
|
|
// importHeaders spawns a new goroutine to run a header insertion into the chain.
|
|
// If the header's number is at the same height as the current import phase, it
|
|
// updates the phase states accordingly.
|
|
func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
|
|
hash := header.Hash()
|
|
log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)
|
|
|
|
go func() {
|
|
defer func() { f.done <- hash }()
|
|
// If the parent's unknown, abort insertion
|
|
parent := f.getHeader(header.ParentHash)
|
|
if parent == nil {
|
|
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
|
|
return
|
|
}
|
|
// Validate the header and if something went wrong, drop the peer
|
|
if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
|
|
log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
|
|
f.dropPeer(peer)
|
|
return
|
|
}
|
|
// Run the actual import and log any issues
|
|
if _, err := f.insertHeaders([]*types.Header{header}); err != nil {
|
|
log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
|
|
return
|
|
}
|
|
// Invoke the testing hook if needed
|
|
if f.importedHook != nil {
|
|
f.importedHook(header, nil)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// importBlocks spawns a new goroutine to run a block insertion into the chain. If the
|
|
// block's number is at the same height as the current import phase, it updates
|
|
// the phase states accordingly.
|
|
func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
|
|
hash := block.Hash()
|
|
|
|
// Run the import on a new thread
|
|
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
|
|
go func() {
|
|
defer func() { f.done <- hash }()
|
|
|
|
// If the parent's unknown, abort insertion
|
|
parent := f.getBlock(block.ParentHash())
|
|
if parent == nil {
|
|
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
|
|
return
|
|
}
|
|
// Quickly validate the header and propagate the block if it passes
|
|
switch err := f.verifyHeader(block.Header()); err {
|
|
case nil:
|
|
// All ok, quickly propagate to our peers
|
|
blockBroadcastOutTimer.UpdateSince(block.ReceivedAt)
|
|
go f.broadcastBlock(block, true)
|
|
|
|
case consensus.ErrFutureBlock:
|
|
log.Error("Received future block", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
|
|
f.dropPeer(peer)
|
|
|
|
default:
|
|
// Something went very wrong, drop the peer
|
|
log.Error("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
|
|
f.dropPeer(peer)
|
|
return
|
|
}
|
|
// Run the actual import and log any issues
|
|
if _, err := f.insertChain(types.Blocks{block}); err != nil {
|
|
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
|
|
return
|
|
}
|
|
// If import succeeded, broadcast the block
|
|
blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
|
|
go f.broadcastBlock(block, false)
|
|
|
|
// Invoke the testing hook if needed
|
|
if f.importedHook != nil {
|
|
f.importedHook(nil, block)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// forgetHash removes all traces of a block announcement from the fetcher's
|
|
// internal state.
|
|
func (f *BlockFetcher) forgetHash(hash common.Hash) {
|
|
// Remove all pending announces and decrement DOS counters
|
|
for _, announce := range f.announced[hash] {
|
|
f.announces[announce.origin]--
|
|
if f.announces[announce.origin] <= 0 {
|
|
delete(f.announces, announce.origin)
|
|
}
|
|
}
|
|
delete(f.announced, hash)
|
|
if f.announceChangeHook != nil {
|
|
f.announceChangeHook(hash, false)
|
|
}
|
|
// Remove any pending fetches and decrement the DOS counters
|
|
if announce := f.fetching[hash]; announce != nil {
|
|
f.announces[announce.origin]--
|
|
if f.announces[announce.origin] <= 0 {
|
|
delete(f.announces, announce.origin)
|
|
}
|
|
delete(f.fetching, hash)
|
|
}
|
|
|
|
// Remove any pending completion requests and decrement the DOS counters
|
|
for _, announce := range f.fetched[hash] {
|
|
f.announces[announce.origin]--
|
|
if f.announces[announce.origin] <= 0 {
|
|
delete(f.announces, announce.origin)
|
|
}
|
|
}
|
|
delete(f.fetched, hash)
|
|
|
|
// Remove any pending completions and decrement the DOS counters
|
|
if announce := f.completing[hash]; announce != nil {
|
|
f.announces[announce.origin]--
|
|
if f.announces[announce.origin] <= 0 {
|
|
delete(f.announces, announce.origin)
|
|
}
|
|
delete(f.completing, hash)
|
|
}
|
|
}
|
|
|
|
// forgetBlock removes all traces of a queued block from the fetcher's internal
|
|
// state.
|
|
func (f *BlockFetcher) forgetBlock(hash common.Hash) {
|
|
if insert := f.queued[hash]; insert != nil {
|
|
f.queues[insert.origin]--
|
|
if f.queues[insert.origin] == 0 {
|
|
delete(f.queues, insert.origin)
|
|
}
|
|
delete(f.queued, hash)
|
|
}
|
|
}
|