2015-07-07 02:54:22 +02:00
// Copyright 2015 The go-ethereum Authors
2015-07-22 18:48:40 +02:00
// This file is part of the go-ethereum library.
2015-07-07 02:54:22 +02:00
//
2015-07-23 18:35:11 +02:00
// The go-ethereum library is free software: you can redistribute it and/or modify
2015-07-07 02:54:22 +02:00
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
2015-07-22 18:48:40 +02:00
// The go-ethereum library is distributed in the hope that it will be useful,
2015-07-07 02:54:22 +02:00
// but WITHOUT ANY WARRANTY; without even the implied warranty of
2015-07-22 18:48:40 +02:00
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
2015-07-07 02:54:22 +02:00
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
2015-07-22 18:48:40 +02:00
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
2015-07-07 02:54:22 +02:00
2020-07-28 23:02:35 +08:00
// Package fetcher contains the announcement based header, blocks or transaction synchronisation.
2015-06-16 11:58:32 +03:00
package fetcher
import (
"errors"
"math/rand"
"time"
"github.com/ethereum/go-ethereum/common"
2018-09-03 23:33:21 +08:00
"github.com/ethereum/go-ethereum/common/prque"
2017-04-06 14:58:03 +03:00
"github.com/ethereum/go-ethereum/consensus"
2015-06-16 11:58:32 +03:00
"github.com/ethereum/go-ethereum/core/types"
2021-11-26 13:26:03 +02:00
"github.com/ethereum/go-ethereum/eth/protocols/eth"
2017-02-22 14:10:07 +02:00
"github.com/ethereum/go-ethereum/log"
2020-01-22 16:39:43 +02:00
"github.com/ethereum/go-ethereum/metrics"
2020-08-21 20:10:40 +08:00
"github.com/ethereum/go-ethereum/trie"
2015-06-16 11:58:32 +03:00
)
const (
2022-07-05 11:14:21 +08:00
lightTimeout = time . Millisecond // Time allowance before an announced header is explicitly requested
arriveTimeout = 500 * time . Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time . Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time . Second // Maximum allotted time to return an explicitly requested block/transaction
reQueueBlockTimeout = 500 * time . Millisecond // Time allowance before blocks are requeued for import
2019-10-28 19:59:07 +08:00
)
const (
2020-06-15 17:11:06 +08:00
maxUncleDist = 11 // Maximum allowed backward distance from the chain head
2019-10-28 19:59:07 +08:00
maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
2020-07-28 23:02:35 +08:00
hashLimit = 256 // Maximum number of unique blocks or headers a peer may have announced
2019-10-28 19:59:07 +08:00
blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
2015-06-16 11:58:32 +03:00
)
2020-01-22 16:39:43 +02:00
var (
blockAnnounceInMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/announces/in" , nil )
blockAnnounceOutTimer = metrics . NewRegisteredTimer ( "eth/fetcher/block/announces/out" , nil )
blockAnnounceDropMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/announces/drop" , nil )
blockAnnounceDOSMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/announces/dos" , nil )
blockBroadcastInMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/broadcasts/in" , nil )
blockBroadcastOutTimer = metrics . NewRegisteredTimer ( "eth/fetcher/block/broadcasts/out" , nil )
blockBroadcastDropMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/broadcasts/drop" , nil )
blockBroadcastDOSMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/broadcasts/dos" , nil )
headerFetchMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/headers" , nil )
bodyFetchMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/bodies" , nil )
headerFilterInMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/filter/headers/in" , nil )
headerFilterOutMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/filter/headers/out" , nil )
bodyFilterInMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/filter/bodies/in" , nil )
bodyFilterOutMeter = metrics . NewRegisteredMeter ( "eth/fetcher/block/filter/bodies/out" , nil )
)
2020-07-28 23:02:35 +08:00
var errTerminated = errors . New ( "terminated" )
// HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
type HeaderRetrievalFn func ( common . Hash ) * types . Header
2015-06-16 11:58:32 +03:00
2015-06-18 18:00:19 +03:00
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func ( common . Hash ) * types . Block
2015-06-16 11:58:32 +03:00
2015-08-14 21:25:41 +03:00
// headerRequesterFn is a callback type for sending a header retrieval request.
2021-11-26 13:26:03 +02:00
type headerRequesterFn func ( common . Hash , chan * eth . Response ) ( * eth . Request , error )
2015-08-14 21:25:41 +03:00
// bodyRequesterFn is a callback type for sending a body retrieval request.
2021-11-26 13:26:03 +02:00
type bodyRequesterFn func ( [ ] common . Hash , chan * eth . Response ) ( * eth . Request , error )
2015-08-14 21:25:41 +03:00
2017-04-05 01:16:29 +03:00
// headerVerifierFn is a callback type to verify a block's header for fast propagation.
type headerVerifierFn func ( header * types . Header ) error
2015-06-18 18:00:19 +03:00
2015-06-17 18:25:23 +03:00
// blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
2015-06-18 18:00:19 +03:00
type blockBroadcasterFn func ( block * types . Block , propagate bool )
2015-06-16 11:58:32 +03:00
2015-06-16 17:39:04 +03:00
// chainHeightFn is a callback type to retrieve the current chain height.
type chainHeightFn func ( ) uint64
2023-09-06 21:53:47 +08:00
// chainFinalizedHeightFn is a callback type to retrieve the current chain finalized height.
type chainFinalizedHeightFn func ( ) uint64
2020-07-28 23:02:35 +08:00
// headersInsertFn is a callback type to insert a batch of headers into the local chain.
type headersInsertFn func ( headers [ ] * types . Header ) ( int , error )
2015-06-17 18:25:23 +03:00
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type chainInsertFn func ( types . Blocks ) ( int , error )
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func ( id string )
2019-10-28 19:59:07 +08:00
// blockAnnounce is the hash notification of the availability of a new block in the
2015-06-16 11:58:32 +03:00
// network.
2019-10-28 19:59:07 +08:00
type blockAnnounce struct {
2015-08-14 21:25:41 +03:00
hash common . Hash // Hash of the block being announced
number uint64 // Number of the block being announced (0 = unknown | old protocol)
header * types . Header // Header of the block partially reassembled (new protocol)
time time . Time // Timestamp of the announcement
origin string // Identifier of the peer originating the notification
2017-03-02 15:06:16 +02:00
fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
2015-08-14 21:25:41 +03:00
}
2015-06-16 11:58:32 +03:00
2015-08-14 21:25:41 +03:00
// headerFilterTask represents a batch of headers needing fetcher filtering.
type headerFilterTask struct {
2017-10-10 16:53:05 +08:00
peer string // The source peer of block headers
2015-08-14 21:25:41 +03:00
headers [ ] * types . Header // Collection of headers to filter
time time . Time // Arrival time of the headers
}
2018-06-13 22:02:28 +08:00
// bodyFilterTask represents a batch of block bodies (transactions and uncles)
2015-08-14 21:25:41 +03:00
// needing fetcher filtering.
type bodyFilterTask struct {
2017-10-10 16:53:05 +08:00
peer string // The source peer of block bodies
2015-08-14 21:25:41 +03:00
transactions [ ] [ ] * types . Transaction // Collection of transactions per block bodies
uncles [ ] [ ] * types . Header // Collection of uncles per block bodies
time time . Time // Arrival time of the blocks' contents
2015-06-16 11:58:32 +03:00
}
2020-07-28 23:02:35 +08:00
// blockOrHeaderInject represents a schedules import operation.
type blockOrHeaderInject struct {
2015-06-16 17:39:04 +03:00
origin string
2020-07-28 23:02:35 +08:00
header * types . Header // Used for light mode fetcher which only cares about header.
block * types . Block // Used for normal mode fetcher which imports full block.
}
// number returns the block number of the injected object.
func ( inject * blockOrHeaderInject ) number ( ) uint64 {
if inject . header != nil {
return inject . header . Number . Uint64 ( )
}
return inject . block . NumberU64 ( )
}
// number returns the block hash of the injected object.
func ( inject * blockOrHeaderInject ) hash ( ) common . Hash {
if inject . header != nil {
return inject . header . Hash ( )
}
return inject . block . Hash ( )
2015-06-16 17:39:04 +03:00
}
2019-10-28 19:59:07 +08:00
// BlockFetcher is responsible for accumulating block announcements from various peers
2015-06-16 11:58:32 +03:00
// and scheduling them for retrieval.
2019-10-28 19:59:07 +08:00
type BlockFetcher struct {
2020-07-28 23:02:35 +08:00
light bool // The indicator whether it's a light fetcher or normal one.
2015-06-16 11:58:32 +03:00
// Various event channels
2019-10-28 19:59:07 +08:00
notify chan * blockAnnounce
2020-07-28 23:02:35 +08:00
inject chan * blockOrHeaderInject
2015-08-14 21:25:41 +03:00
headerFilter chan chan * headerFilterTask
bodyFilter chan chan * bodyFilterTask
done chan common . Hash
quit chan struct { }
2015-06-16 11:58:32 +03:00
2022-07-05 11:14:21 +08:00
requeue chan * blockOrHeaderInject
2015-06-17 00:19:09 +03:00
// Announce states
2019-10-28 19:59:07 +08:00
announces map [ string ] int // Per peer blockAnnounce counts to prevent memory exhaustion
announced map [ common . Hash ] [ ] * blockAnnounce // Announced blocks, scheduled for fetching
fetching map [ common . Hash ] * blockAnnounce // Announced blocks, currently fetching
fetched map [ common . Hash ] [ ] * blockAnnounce // Blocks with headers fetched, scheduled for body retrieval
completing map [ common . Hash ] * blockAnnounce // Blocks with headers, currently body-completing
2015-06-17 00:19:09 +03:00
2015-06-16 23:18:01 +03:00
// Block cache
2023-02-09 13:03:54 +02:00
queue * prque . Prque [ int64 , * blockOrHeaderInject ] // Queue containing the import operations (block number sorted)
queues map [ string ] int // Per peer block counts to prevent memory exhaustion
queued map [ common . Hash ] * blockOrHeaderInject // Set of already queued blocks (to dedup imports)
2015-06-16 23:18:01 +03:00
2015-06-16 11:58:32 +03:00
// Callbacks
2023-09-06 21:53:47 +08:00
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
chainHeight chainHeightFn // Retrieves the current chain's height
chainFinalizedHeight chainFinalizedHeightFn // Retrieves the current chain's finalized height
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving
2015-06-22 18:08:28 +03:00
// Testing hooks
2020-07-28 23:02:35 +08:00
announceChangeHook func ( common . Hash , bool ) // Method to call upon adding or deleting a hash from the blockAnnounce list
queueChangeHook func ( common . Hash , bool ) // Method to call upon adding or deleting a block from the import queue
fetchingHook func ( [ ] common . Hash ) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func ( [ ] common . Hash ) // Method to call upon starting a block body fetch (eth/62)
importedHook func ( * types . Header , * types . Block ) // Method to call upon successful header or block import (both eth/61 and eth/62)
2015-06-16 11:58:32 +03:00
}
2019-10-28 19:59:07 +08:00
// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
2023-09-06 21:53:47 +08:00
func NewBlockFetcher ( light bool , getHeader HeaderRetrievalFn , getBlock blockRetrievalFn , verifyHeader headerVerifierFn ,
broadcastBlock blockBroadcasterFn , chainHeight chainHeightFn , chainFinalizedHeight chainFinalizedHeightFn ,
insertHeaders headersInsertFn , insertChain chainInsertFn , dropPeer peerDropFn ) * BlockFetcher {
2019-10-28 19:59:07 +08:00
return & BlockFetcher {
2023-09-06 21:53:47 +08:00
light : light ,
notify : make ( chan * blockAnnounce ) ,
inject : make ( chan * blockOrHeaderInject ) ,
headerFilter : make ( chan chan * headerFilterTask ) ,
bodyFilter : make ( chan chan * bodyFilterTask ) ,
done : make ( chan common . Hash ) ,
quit : make ( chan struct { } ) ,
requeue : make ( chan * blockOrHeaderInject ) ,
announces : make ( map [ string ] int ) ,
announced : make ( map [ common . Hash ] [ ] * blockAnnounce ) ,
fetching : make ( map [ common . Hash ] * blockAnnounce ) ,
fetched : make ( map [ common . Hash ] [ ] * blockAnnounce ) ,
completing : make ( map [ common . Hash ] * blockAnnounce ) ,
2023-09-07 17:50:15 +08:00
queue : prque . New [ int64 , * blockOrHeaderInject ] ( nil ) ,
2023-09-06 21:53:47 +08:00
queues : make ( map [ string ] int ) ,
queued : make ( map [ common . Hash ] * blockOrHeaderInject ) ,
getHeader : getHeader ,
getBlock : getBlock ,
verifyHeader : verifyHeader ,
broadcastBlock : broadcastBlock ,
chainHeight : chainHeight ,
chainFinalizedHeight : chainFinalizedHeight ,
insertHeaders : insertHeaders ,
insertChain : insertChain ,
dropPeer : dropPeer ,
2015-06-16 11:58:32 +03:00
}
}
2016-03-15 11:27:49 -07:00
// Start boots up the announcement based synchroniser, accepting and processing
2015-06-16 11:58:32 +03:00
// hash notifications and block fetches until termination requested.
2019-10-28 19:59:07 +08:00
func ( f * BlockFetcher ) Start ( ) {
2015-06-16 11:58:32 +03:00
go f . loop ( )
}
// Stop terminates the announcement based synchroniser, canceling all pending
// operations.
2019-10-28 19:59:07 +08:00
func ( f * BlockFetcher ) Stop ( ) {
2015-06-16 11:58:32 +03:00
close ( f . quit )
}
// Notify announces the fetcher of the potential availability of a new block in
// the network.
2019-10-28 19:59:07 +08:00
func ( f * BlockFetcher ) Notify ( peer string , hash common . Hash , number uint64 , time time . Time ,
2023-09-07 16:39:29 +08:00
headerFetcher headerRequesterFn , bodyFetcher bodyRequesterFn ) error {
2019-10-28 19:59:07 +08:00
block := & blockAnnounce {
2015-08-14 21:25:41 +03:00
hash : hash ,
number : number ,
time : time ,
origin : peer ,
fetchHeader : headerFetcher ,
fetchBodies : bodyFetcher ,
2015-06-16 11:58:32 +03:00
}
select {
case f . notify <- block :
return nil
case <- f . quit :
return errTerminated
}
}
2018-08-27 16:49:29 +08:00
// Enqueue tries to fill gaps the fetcher's future import queue.
2019-10-28 19:59:07 +08:00
func ( f * BlockFetcher ) Enqueue ( peer string , block * types . Block ) error {
2020-07-28 23:02:35 +08:00
op := & blockOrHeaderInject {
2015-06-16 18:14:52 +03:00
origin : peer ,
block : block ,
}
select {
2015-06-17 16:53:28 +03:00
case f . inject <- op :
2015-06-16 18:14:52 +03:00
return nil
case <- f . quit :
return errTerminated
}
}
2015-08-14 21:25:41 +03:00
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
2019-10-28 19:59:07 +08:00
func ( f * BlockFetcher ) FilterHeaders ( peer string , headers [ ] * types . Header , time time . Time ) [ ] * types . Header {
2017-10-10 16:53:05 +08:00
log . Trace ( "Filtering headers" , "peer" , peer , "headers" , len ( headers ) )
2015-08-14 21:25:41 +03:00
// Send the filter channel to the fetcher
filter := make ( chan * headerFilterTask )
select {
case f . headerFilter <- filter :
case <- f . quit :
return nil
}
// Request the filtering of the header list
select {
2017-10-10 16:53:05 +08:00
case filter <- & headerFilterTask { peer : peer , headers : headers , time : time } :
2015-08-14 21:25:41 +03:00
case <- f . quit :
return nil
}
// Retrieve the headers remaining after filtering
select {
case task := <- filter :
return task . headers
case <- f . quit :
return nil
}
}
// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
2019-10-28 19:59:07 +08:00
func ( f * BlockFetcher ) FilterBodies ( peer string , transactions [ ] [ ] * types . Transaction , uncles [ ] [ ] * types . Header , time time . Time ) ( [ ] [ ] * types . Transaction , [ ] [ ] * types . Header ) {
2017-10-10 16:53:05 +08:00
log . Trace ( "Filtering bodies" , "peer" , peer , "txs" , len ( transactions ) , "uncles" , len ( uncles ) )
2015-08-14 21:25:41 +03:00
// Send the filter channel to the fetcher
filter := make ( chan * bodyFilterTask )
select {
case f . bodyFilter <- filter :
case <- f . quit :
return nil , nil
}
// Request the filtering of the body list
select {
2017-10-10 16:53:05 +08:00
case filter <- & bodyFilterTask { peer : peer , transactions : transactions , uncles : uncles , time : time } :
2015-08-14 21:25:41 +03:00
case <- f . quit :
return nil , nil
}
// Retrieve the bodies remaining after filtering
select {
case task := <- filter :
return task . transactions , task . uncles
case <- f . quit :
return nil , nil
}
}
2015-06-16 11:58:32 +03:00
// Loop is the main fetcher loop, checking and processing various notification
// events.
2019-10-28 19:59:07 +08:00
func ( f * BlockFetcher ) loop ( ) {
2015-06-16 18:43:58 +03:00
// Iterate the block fetching until a quit is requested
2021-04-14 18:44:32 +08:00
var (
fetchTimer = time . NewTimer ( 0 )
completeTimer = time . NewTimer ( 0 )
)
<- fetchTimer . C // clear out the channel
<- completeTimer . C
2020-04-02 18:32:45 +08:00
defer fetchTimer . Stop ( )
defer completeTimer . Stop ( )
2015-08-14 21:25:41 +03:00
2015-06-16 11:58:32 +03:00
for {
// Clean up any expired block fetches
2015-06-17 00:19:09 +03:00
for hash , announce := range f . fetching {
2015-06-16 11:58:32 +03:00
if time . Since ( announce . time ) > fetchTimeout {
2015-06-22 16:49:47 +03:00
f . forgetHash ( hash )
2015-06-16 11:58:32 +03:00
}
}
2015-06-16 17:39:04 +03:00
// Import any queued blocks that could potentially fit
height := f . chainHeight ( )
2023-09-06 21:53:47 +08:00
finalizedHeight := f . chainFinalizedHeight ( )
2015-06-16 23:18:01 +03:00
for ! f . queue . Empty ( ) {
2023-02-09 13:03:54 +02:00
op := f . queue . PopItem ( )
2020-07-28 23:02:35 +08:00
hash := op . hash ( )
2015-10-13 12:04:25 +03:00
if f . queueChangeHook != nil {
2018-05-29 10:57:08 +03:00
f . queueChangeHook ( hash , false )
2015-10-13 12:04:25 +03:00
}
2015-06-17 16:53:28 +03:00
// If too high up the chain or phase, continue later
2020-07-28 23:02:35 +08:00
number := op . number ( )
2015-06-17 16:53:28 +03:00
if number > height + 1 {
2018-09-03 23:33:21 +08:00
f . queue . Push ( op , - int64 ( number ) )
2015-10-13 12:04:25 +03:00
if f . queueChangeHook != nil {
2018-05-29 10:57:08 +03:00
f . queueChangeHook ( hash , true )
2015-10-13 12:04:25 +03:00
}
2015-06-16 17:39:04 +03:00
break
}
2015-06-17 16:53:28 +03:00
// Otherwise if fresh and still unknown, try and import
2023-09-06 21:53:47 +08:00
if ( number + maxUncleDist < height ) || number <= finalizedHeight || ( f . light && f . getHeader ( hash ) != nil ) || ( ! f . light && f . getBlock ( hash ) != nil ) {
2015-06-22 16:49:47 +03:00
f . forgetBlock ( hash )
2015-06-16 18:43:58 +03:00
continue
}
2020-07-28 23:02:35 +08:00
if f . light {
2022-07-05 11:14:21 +08:00
f . importHeaders ( op )
2020-07-28 23:02:35 +08:00
} else {
2022-07-05 11:14:21 +08:00
f . importBlocks ( op )
2020-07-28 23:02:35 +08:00
}
2015-06-16 17:39:04 +03:00
}
2015-06-16 11:58:32 +03:00
// Wait for an outside event to occur
select {
case <- f . quit :
2019-10-28 19:59:07 +08:00
// BlockFetcher terminating, abort all operations
2015-06-16 11:58:32 +03:00
return
case notification := <- f . notify :
2015-06-22 14:07:08 +03:00
// A block was announced, make sure the peer isn't DOSing us
2019-10-28 19:59:07 +08:00
blockAnnounceInMeter . Mark ( 1 )
2015-06-19 18:13:49 +03:00
2015-06-22 14:07:08 +03:00
count := f . announces [ notification . origin ] + 1
2015-06-22 16:49:47 +03:00
if count > hashLimit {
2017-03-02 15:06:16 +02:00
log . Debug ( "Peer exceeded outstanding announces" , "peer" , notification . origin , "limit" , hashLimit )
2019-10-28 19:59:07 +08:00
blockAnnounceDOSMeter . Mark ( 1 )
2015-06-22 14:07:08 +03:00
break
}
2021-11-01 02:38:48 +08:00
if notification . number == 0 {
break
}
2015-08-14 21:25:41 +03:00
// If we have a valid block number, check that it's potentially useful
2021-11-01 02:38:48 +08:00
if dist := int64 ( notification . number ) - int64 ( f . chainHeight ( ) ) ; dist < - maxUncleDist || dist > maxQueueDist {
2023-09-06 21:53:47 +08:00
log . Debug ( "Peer discarded announcement by distance" , "peer" , notification . origin , "number" , notification . number , "hash" , notification . hash , "distance" , dist )
blockAnnounceDropMeter . Mark ( 1 )
break
}
finalized := f . chainFinalizedHeight ( )
if notification . number <= finalized {
log . Debug ( "Peer discarded announcement by finality" , "peer" , notification . origin , "number" , notification . number , "hash" , notification . hash , "finalized" , finalized )
2021-11-01 02:38:48 +08:00
blockAnnounceDropMeter . Mark ( 1 )
break
2015-08-14 21:25:41 +03:00
}
2015-06-22 14:07:08 +03:00
// All is well, schedule the announce if block's not yet downloading
2015-06-17 00:19:09 +03:00
if _ , ok := f . fetching [ notification . hash ] ; ok {
2015-06-16 11:58:32 +03:00
break
}
2015-08-14 21:25:41 +03:00
if _ , ok := f . completing [ notification . hash ] ; ok {
break
}
2015-06-22 14:07:08 +03:00
f . announces [ notification . origin ] = count
2015-06-17 00:19:09 +03:00
f . announced [ notification . hash ] = append ( f . announced [ notification . hash ] , notification )
2015-10-13 12:04:25 +03:00
if f . announceChangeHook != nil && len ( f . announced [ notification . hash ] ) == 1 {
f . announceChangeHook ( notification . hash , true )
}
2015-06-17 00:19:09 +03:00
if len ( f . announced ) == 1 {
2015-08-14 21:25:41 +03:00
f . rescheduleFetch ( fetchTimer )
2015-06-16 11:58:32 +03:00
}
2022-07-05 11:14:21 +08:00
case op := <- f . requeue :
// Re-queue blocks that have not been written due to fork block competition
number := int64 ( 0 )
hash := ""
if op . header != nil {
number = op . header . Number . Int64 ( )
hash = op . header . Hash ( ) . String ( )
} else if op . block != nil {
number = op . block . Number ( ) . Int64 ( )
hash = op . block . Hash ( ) . String ( )
}
log . Info ( "Re-queue blocks" , "number" , number , "hash" , hash )
f . enqueue ( op . origin , op . header , op . block )
2015-06-17 16:53:28 +03:00
case op := <- f . inject :
2015-06-16 18:14:52 +03:00
// A direct block insertion was requested, try and fill any pending gaps
2019-10-28 19:59:07 +08:00
blockBroadcastInMeter . Mark ( 1 )
2020-07-28 23:02:35 +08:00
// Now only direct block injection is allowed, drop the header injection
// here silently if we receive.
if f . light {
continue
}
f . enqueue ( op . origin , nil , op . block )
2015-06-16 18:14:52 +03:00
2015-06-17 16:53:28 +03:00
case hash := <- f . done :
2015-06-16 11:58:32 +03:00
// A pending import finished, remove all traces of the notification
2015-06-22 16:49:47 +03:00
f . forgetHash ( hash )
2015-06-22 14:07:08 +03:00
f . forgetBlock ( hash )
2015-06-16 11:58:32 +03:00
2015-08-14 21:25:41 +03:00
case <- fetchTimer . C :
2015-06-16 11:58:32 +03:00
// At least one block's timer ran out, check for needing retrieval
request := make ( map [ string ] [ ] common . Hash )
2015-06-17 00:19:09 +03:00
for hash , announces := range f . announced {
2020-07-28 23:02:35 +08:00
// In current LES protocol(les2/les3), only header announce is
// available, no need to wait too much time for header broadcast.
timeout := arriveTimeout - gatherSlack
if f . light {
timeout = 0
}
if time . Since ( announces [ 0 ] . time ) > timeout {
2015-06-22 14:07:08 +03:00
// Pick a random peer to retrieve from, reset all others
2015-06-16 11:58:32 +03:00
announce := announces [ rand . Intn ( len ( announces ) ) ]
2015-06-22 16:49:47 +03:00
f . forgetHash ( hash )
2015-06-22 14:07:08 +03:00
// If the block still didn't arrive, queue for fetching
2020-07-28 23:02:35 +08:00
if ( f . light && f . getHeader ( hash ) == nil ) || ( ! f . light && f . getBlock ( hash ) == nil ) {
2015-06-16 11:58:32 +03:00
request [ announce . origin ] = append ( request [ announce . origin ] , hash )
2015-06-17 00:19:09 +03:00
f . fetching [ hash ] = announce
2015-06-16 11:58:32 +03:00
}
}
}
2016-07-21 11:36:38 +02:00
// Send out all block header requests
2015-06-19 16:46:16 +03:00
for peer , hashes := range request {
2017-03-02 15:06:16 +02:00
log . Trace ( "Fetching scheduled headers" , "peer" , peer , "list" , hashes )
2015-06-22 20:13:18 +03:00
// Create a closure of the fetch and schedule in on a new thread
2016-07-21 11:36:38 +02:00
fetchHeader , hashes := f . fetching [ hashes [ 0 ] ] . fetchHeader , hashes
2022-07-05 11:14:21 +08:00
2021-11-26 13:26:03 +02:00
go func ( peer string ) {
2015-06-22 18:08:28 +03:00
if f . fetchingHook != nil {
f . fetchingHook ( hashes )
}
2016-07-21 11:36:38 +02:00
for _ , hash := range hashes {
headerFetchMeter . Mark ( 1 )
2021-11-26 13:26:03 +02:00
go func ( hash common . Hash ) {
resCh := make ( chan * eth . Response )
req , err := fetchHeader ( hash , resCh )
if err != nil {
return // Legacy code, yolo
}
defer req . Close ( )
2022-04-06 10:18:57 +03:00
timeout := time . NewTimer ( 2 * fetchTimeout ) // 2x leeway before dropping the peer
defer timeout . Stop ( )
select {
case res := <- resCh :
res . Done <- nil
f . FilterHeaders ( peer , * res . Res . ( * eth . BlockHeadersPacket ) , time . Now ( ) . Add ( res . Time ) )
case <- timeout . C :
// The peer didn't respond in time. The request
// was already rescheduled at this point, we were
// waiting for a catchup. With an unresponsive
// peer however, it's a protocol violation.
f . dropPeer ( peer )
}
2021-11-26 13:26:03 +02:00
} ( hash )
2015-08-14 21:25:41 +03:00
}
2021-11-26 13:26:03 +02:00
} ( peer )
2015-06-16 11:58:32 +03:00
}
// Schedule the next fetch if blocks are still pending
2015-08-14 21:25:41 +03:00
f . rescheduleFetch ( fetchTimer )
2015-06-16 11:58:32 +03:00
2015-08-14 21:25:41 +03:00
case <- completeTimer . C :
// At least one header's timer ran out, retrieve everything
request := make ( map [ string ] [ ] common . Hash )
for hash , announces := range f . fetched {
// Pick a random peer to retrieve from, reset all others
announce := announces [ rand . Intn ( len ( announces ) ) ]
f . forgetHash ( hash )
// If the block still didn't arrive, queue for completion
if f . getBlock ( hash ) == nil {
request [ announce . origin ] = append ( request [ announce . origin ] , hash )
f . completing [ hash ] = announce
}
}
// Send out all block body requests
for peer , hashes := range request {
2017-03-02 15:06:16 +02:00
log . Trace ( "Fetching scheduled bodies" , "peer" , peer , "list" , hashes )
2015-08-14 21:25:41 +03:00
// Create a closure of the fetch and schedule in on a new thread
if f . completingHook != nil {
f . completingHook ( hashes )
}
2021-11-26 13:26:03 +02:00
fetchBodies := f . completing [ hashes [ 0 ] ] . fetchBodies
2015-08-25 13:57:49 +03:00
bodyFetchMeter . Mark ( int64 ( len ( hashes ) ) )
2021-11-26 13:26:03 +02:00
go func ( peer string , hashes [ ] common . Hash ) {
resCh := make ( chan * eth . Response )
req , err := fetchBodies ( hashes , resCh )
if err != nil {
return // Legacy code, yolo
}
defer req . Close ( )
2022-04-06 10:18:57 +03:00
timeout := time . NewTimer ( 2 * fetchTimeout ) // 2x leeway before dropping the peer
defer timeout . Stop ( )
select {
case res := <- resCh :
res . Done <- nil
2023-01-25 15:32:25 +01:00
// Ignoring withdrawals here, since the block fetcher is not used post-merge.
txs , uncles , _ := res . Res . ( * eth . BlockBodiesPacket ) . Unpack ( )
2022-04-06 10:18:57 +03:00
f . FilterBodies ( peer , txs , uncles , time . Now ( ) )
case <- timeout . C :
// The peer didn't respond in time. The request
// was already rescheduled at this point, we were
// waiting for a catchup. With an unresponsive
// peer however, it's a protocol violation.
f . dropPeer ( peer )
}
2021-11-26 13:26:03 +02:00
} ( peer , hashes )
2015-08-14 21:25:41 +03:00
}
// Schedule the next fetch if blocks are still pending
f . rescheduleComplete ( completeTimer )
case filter := <- f . headerFilter :
// Headers arrived from a remote peer. Extract those that were explicitly
// requested by the fetcher, and return everything else so it's delivered
// to other parts of the system.
var task * headerFilterTask
select {
case task = <- filter :
case <- f . quit :
return
}
2015-08-25 13:57:49 +03:00
headerFilterInMeter . Mark ( int64 ( len ( task . headers ) ) )
2015-08-14 21:25:41 +03:00
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
2020-07-28 23:02:35 +08:00
unknown , incomplete , complete , lightHeaders := [ ] * types . Header { } , [ ] * blockAnnounce { } , [ ] * types . Block { } , [ ] * blockAnnounce { }
2015-08-14 21:25:41 +03:00
for _ , header := range task . headers {
hash := header . Hash ( )
// Filter fetcher-requested headers from other synchronisation algorithms
2017-10-10 16:53:05 +08:00
if announce := f . fetching [ hash ] ; announce != nil && announce . origin == task . peer && f . fetched [ hash ] == nil && f . completing [ hash ] == nil && f . queued [ hash ] == nil {
2015-08-14 21:25:41 +03:00
// If the delivered header does not match the promised number, drop the announcer
if header . Number . Uint64 ( ) != announce . number {
2017-03-02 15:06:16 +02:00
log . Trace ( "Invalid block number fetched" , "peer" , announce . origin , "hash" , header . Hash ( ) , "announced" , announce . number , "provided" , header . Number )
2015-08-14 21:25:41 +03:00
f . dropPeer ( announce . origin )
f . forgetHash ( hash )
continue
}
2020-07-28 23:02:35 +08:00
// Collect all headers only if we are running in light
// mode and the headers are not imported by other means.
if f . light {
if f . getHeader ( hash ) == nil {
announce . header = header
lightHeaders = append ( lightHeaders , announce )
}
f . forgetHash ( hash )
continue
}
2015-08-14 21:25:41 +03:00
// Only keep if not imported by other means
if f . getBlock ( hash ) == nil {
announce . header = header
announce . time = task . time
// If the block is empty (header only), short circuit into the final import queue
2023-02-21 19:12:27 +08:00
if header . TxHash == types . EmptyTxsHash && header . UncleHash == types . EmptyUncleHash {
2017-03-02 15:06:16 +02:00
log . Trace ( "Block empty, skipping body retrieval" , "peer" , announce . origin , "number" , header . Number , "hash" , header . Hash ( ) )
2015-08-14 21:25:41 +03:00
2015-08-25 13:57:49 +03:00
block := types . NewBlockWithHeader ( header )
block . ReceivedAt = task . time
complete = append ( complete , block )
2015-08-14 21:25:41 +03:00
f . completing [ hash ] = announce
continue
}
// Otherwise add to the list of blocks needing completion
incomplete = append ( incomplete , announce )
} else {
2017-03-02 15:06:16 +02:00
log . Trace ( "Block already imported, discarding header" , "peer" , announce . origin , "number" , header . Number , "hash" , header . Hash ( ) )
2015-08-14 21:25:41 +03:00
f . forgetHash ( hash )
}
} else {
2019-10-28 19:59:07 +08:00
// BlockFetcher doesn't know about it, add to the return list
2015-08-14 21:25:41 +03:00
unknown = append ( unknown , header )
}
}
2015-08-25 13:57:49 +03:00
headerFilterOutMeter . Mark ( int64 ( len ( unknown ) ) )
2015-08-14 21:25:41 +03:00
select {
case filter <- & headerFilterTask { headers : unknown , time : task . time } :
case <- f . quit :
return
}
// Schedule the retrieved headers for body completion
for _ , announce := range incomplete {
hash := announce . header . Hash ( )
if _ , ok := f . completing [ hash ] ; ok {
continue
}
f . fetched [ hash ] = append ( f . fetched [ hash ] , announce )
if len ( f . fetched ) == 1 {
f . rescheduleComplete ( completeTimer )
}
}
2020-07-28 23:02:35 +08:00
// Schedule the header for light fetcher import
for _ , announce := range lightHeaders {
f . enqueue ( announce . origin , announce . header , nil )
}
2015-08-14 21:25:41 +03:00
// Schedule the header-only blocks for import
for _ , block := range complete {
if announce := f . completing [ block . Hash ( ) ] ; announce != nil {
2020-07-28 23:02:35 +08:00
f . enqueue ( announce . origin , nil , block )
2015-08-14 21:25:41 +03:00
}
}
case filter := <- f . bodyFilter :
// Block bodies arrived, extract any explicitly requested blocks, return the rest
var task * bodyFilterTask
select {
case task = <- filter :
case <- f . quit :
return
}
2015-08-25 13:57:49 +03:00
bodyFilterInMeter . Mark ( int64 ( len ( task . transactions ) ) )
2015-08-14 21:25:41 +03:00
blocks := [ ] * types . Block { }
2020-07-24 09:46:26 +02:00
// abort early if there's nothing explicitly requested
if len ( f . completing ) > 0 {
for i := 0 ; i < len ( task . transactions ) && i < len ( task . uncles ) ; i ++ {
// Match up a body to any possible completion request
var (
matched = false
uncleHash common . Hash // calculated lazily and reused
txnHash common . Hash // calculated lazily and reused
)
for hash , announce := range f . completing {
if f . queued [ hash ] != nil || announce . origin != task . peer {
continue
}
if uncleHash == ( common . Hash { } ) {
uncleHash = types . CalcUncleHash ( task . uncles [ i ] )
}
if uncleHash != announce . header . UncleHash {
continue
2015-08-14 21:25:41 +03:00
}
2020-07-24 09:46:26 +02:00
if txnHash == ( common . Hash { } ) {
2021-02-02 20:09:23 +08:00
txnHash = types . DeriveSha ( types . Transactions ( task . transactions [ i ] ) , trie . NewStackTrie ( nil ) )
2020-07-24 09:46:26 +02:00
}
if txnHash != announce . header . TxHash {
continue
2015-08-14 21:25:41 +03:00
}
2020-07-24 09:46:26 +02:00
// Mark the body matched, reassemble if still unknown
matched = true
if f . getBlock ( hash ) == nil {
block := types . NewBlockWithHeader ( announce . header ) . WithBody ( task . transactions [ i ] , task . uncles [ i ] )
block . ReceivedAt = task . time
blocks = append ( blocks , block )
} else {
f . forgetHash ( hash )
}
}
if matched {
task . transactions = append ( task . transactions [ : i ] , task . transactions [ i + 1 : ] ... )
task . uncles = append ( task . uncles [ : i ] , task . uncles [ i + 1 : ] ... )
i --
continue
2015-08-14 21:25:41 +03:00
}
}
}
2015-08-25 13:57:49 +03:00
bodyFilterOutMeter . Mark ( int64 ( len ( task . transactions ) ) )
2015-08-14 21:25:41 +03:00
select {
case filter <- task :
case <- f . quit :
return
}
// Schedule the retrieved blocks for ordered import
for _ , block := range blocks {
if announce := f . completing [ block . Hash ( ) ] ; announce != nil {
2020-07-28 23:02:35 +08:00
f . enqueue ( announce . origin , nil , block )
2015-08-14 21:25:41 +03:00
}
}
2015-06-16 11:58:32 +03:00
}
}
}
2015-06-16 23:18:01 +03:00
2019-10-28 19:59:07 +08:00
// rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout.
func ( f * BlockFetcher ) rescheduleFetch ( fetch * time . Timer ) {
2015-06-17 00:19:09 +03:00
// Short circuit if no blocks are announced
if len ( f . announced ) == 0 {
return
}
2020-07-28 23:02:35 +08:00
// Schedule announcement retrieval quickly for light mode
// since server won't send any headers to client.
if f . light {
fetch . Reset ( lightTimeout )
return
}
2015-06-17 00:19:09 +03:00
// Otherwise find the earliest expiring announcement
earliest := time . Now ( )
for _ , announces := range f . announced {
if earliest . After ( announces [ 0 ] . time ) {
earliest = announces [ 0 ] . time
}
}
fetch . Reset ( arriveTimeout - time . Since ( earliest ) )
}
2015-08-14 21:25:41 +03:00
// rescheduleComplete resets the specified completion timer to the next fetch timeout.
2019-10-28 19:59:07 +08:00
func ( f * BlockFetcher ) rescheduleComplete ( complete * time . Timer ) {
2015-08-14 21:25:41 +03:00
// Short circuit if no headers are fetched
if len ( f . fetched ) == 0 {
return
}
// Otherwise find the earliest expiring announcement
earliest := time . Now ( )
for _ , announces := range f . fetched {
if earliest . After ( announces [ 0 ] . time ) {
earliest = announces [ 0 ] . time
}
}
complete . Reset ( gatherSlack - time . Since ( earliest ) )
}
2020-07-28 23:02:35 +08:00
// enqueue schedules a new header or block import operation, if the component
// to be imported has not yet been seen.
func ( f * BlockFetcher ) enqueue ( peer string , header * types . Header , block * types . Block ) {
var (
hash common . Hash
number uint64
)
if header != nil {
hash , number = header . Hash ( ) , header . Number . Uint64 ( )
} else {
hash , number = block . Hash ( ) , block . NumberU64 ( )
}
2015-06-22 16:49:47 +03:00
// Ensure the peer isn't DOSing us
count := f . queues [ peer ] + 1
if count > blockLimit {
2020-07-28 23:02:35 +08:00
log . Debug ( "Discarded delivered header or block, exceeded allowance" , "peer" , peer , "number" , number , "hash" , hash , "limit" , blockLimit )
2019-10-28 19:59:07 +08:00
blockBroadcastDOSMeter . Mark ( 1 )
2015-08-14 21:25:41 +03:00
f . forgetHash ( hash )
2015-06-22 16:49:47 +03:00
return
}
2015-06-17 16:53:28 +03:00
// Discard any past or too distant blocks
2020-07-28 23:02:35 +08:00
if dist := int64 ( number ) - int64 ( f . chainHeight ( ) ) ; dist < - maxUncleDist || dist > maxQueueDist {
log . Debug ( "Discarded delivered header or block, too far away" , "peer" , peer , "number" , number , "hash" , hash , "distance" , dist )
2019-10-28 19:59:07 +08:00
blockBroadcastDropMeter . Mark ( 1 )
2015-08-14 21:25:41 +03:00
f . forgetHash ( hash )
2015-06-16 23:18:01 +03:00
return
}
2023-09-06 21:53:47 +08:00
// Discard any block that is below the current finalized height
finalizedHeight := f . chainFinalizedHeight ( )
if number <= finalizedHeight {
log . Debug ( "Discarded delivered header or block, below or equal to finalized" , "peer" , peer , "number" , number , "hash" , hash , "finalized" , finalizedHeight )
blockBroadcastDropMeter . Mark ( 1 )
f . forgetHash ( hash )
return
}
2015-06-16 23:18:01 +03:00
// Schedule the block for future importing
if _ , ok := f . queued [ hash ] ; ! ok {
2020-07-28 23:02:35 +08:00
op := & blockOrHeaderInject { origin : peer }
if header != nil {
op . header = header
} else {
op . block = block
2015-06-22 16:49:47 +03:00
}
f . queues [ peer ] = count
f . queued [ hash ] = op
2020-07-28 23:02:35 +08:00
f . queue . Push ( op , - int64 ( number ) )
2015-10-13 12:04:25 +03:00
if f . queueChangeHook != nil {
2020-07-28 23:02:35 +08:00
f . queueChangeHook ( hash , true )
2015-10-13 12:04:25 +03:00
}
2020-07-28 23:02:35 +08:00
log . Debug ( "Queued delivered header or block" , "peer" , peer , "number" , number , "hash" , hash , "queued" , f . queue . Size ( ) )
2015-06-16 23:18:01 +03:00
}
}
2015-06-17 16:53:28 +03:00
2020-07-28 23:02:35 +08:00
// importHeaders spawns a new goroutine to run a header insertion into the chain.
// If the header's number is at the same height as the current import phase, it
// updates the phase states accordingly.
2022-07-05 11:14:21 +08:00
func ( f * BlockFetcher ) importHeaders ( op * blockOrHeaderInject ) {
peer := op . origin
header := op . header
2020-07-28 23:02:35 +08:00
hash := header . Hash ( )
log . Debug ( "Importing propagated header" , "peer" , peer , "number" , header . Number , "hash" , hash )
go func ( ) {
// If the parent's unknown, abort insertion
parent := f . getHeader ( header . ParentHash )
if parent == nil {
log . Debug ( "Unknown parent of propagated header" , "peer" , peer , "number" , header . Number , "hash" , hash , "parent" , header . ParentHash )
2022-07-05 11:14:21 +08:00
time . Sleep ( reQueueBlockTimeout )
2023-02-07 19:29:40 +08:00
// forget block first, then re-queue
f . done <- hash
2022-07-05 11:14:21 +08:00
f . requeue <- op
2020-07-28 23:02:35 +08:00
return
}
2023-02-07 19:29:40 +08:00
defer func ( ) { f . done <- hash } ( )
2020-07-28 23:02:35 +08:00
// Validate the header and if something went wrong, drop the peer
if err := f . verifyHeader ( header ) ; err != nil && err != consensus . ErrFutureBlock {
log . Debug ( "Propagated header verification failed" , "peer" , peer , "number" , header . Number , "hash" , hash , "err" , err )
f . dropPeer ( peer )
return
}
// Run the actual import and log any issues
if _ , err := f . insertHeaders ( [ ] * types . Header { header } ) ; err != nil {
log . Debug ( "Propagated header import failed" , "peer" , peer , "number" , header . Number , "hash" , hash , "err" , err )
return
}
// Invoke the testing hook if needed
if f . importedHook != nil {
f . importedHook ( header , nil )
}
} ( )
}
// importBlocks spawns a new goroutine to run a block insertion into the chain. If the
2018-03-07 18:15:54 +09:00
// block's number is at the same height as the current import phase, it updates
2015-06-17 16:53:28 +03:00
// the phase states accordingly.
2022-07-05 11:14:21 +08:00
func ( f * BlockFetcher ) importBlocks ( op * blockOrHeaderInject ) {
peer := op . origin
block := op . block
2015-06-17 16:53:28 +03:00
hash := block . Hash ( )
// Run the import on a new thread
2017-03-02 15:06:16 +02:00
log . Debug ( "Importing propagated block" , "peer" , peer , "number" , block . Number ( ) , "hash" , hash )
2015-06-17 16:53:28 +03:00
go func ( ) {
2015-06-17 18:25:23 +03:00
// If the parent's unknown, abort insertion
2015-06-18 18:00:19 +03:00
parent := f . getBlock ( block . ParentHash ( ) )
if parent == nil {
2017-03-02 15:06:16 +02:00
log . Debug ( "Unknown parent of propagated block" , "peer" , peer , "number" , block . Number ( ) , "hash" , hash , "parent" , block . ParentHash ( ) )
2022-07-05 11:14:21 +08:00
time . Sleep ( reQueueBlockTimeout )
2023-02-07 19:29:40 +08:00
// forget block first, then re-queue
f . done <- hash
2022-07-05 11:14:21 +08:00
f . requeue <- op
2015-06-18 18:00:19 +03:00
return
}
2023-02-07 19:29:40 +08:00
defer func ( ) { f . done <- hash } ( )
2015-06-18 18:00:19 +03:00
// Quickly validate the header and propagate the block if it passes
2017-04-05 01:16:29 +03:00
switch err := f . verifyHeader ( block . Header ( ) ) ; err {
2015-06-29 14:20:13 +03:00
case nil :
// All ok, quickly propagate to our peers
2019-10-28 19:59:07 +08:00
blockBroadcastOutTimer . UpdateSince ( block . ReceivedAt )
2015-06-29 14:20:13 +03:00
go f . broadcastBlock ( block , true )
2017-04-06 14:58:03 +03:00
case consensus . ErrFutureBlock :
2020-08-07 17:06:29 +08:00
log . Error ( "Received future block" , "peer" , peer , "number" , block . Number ( ) , "hash" , hash , "err" , err )
f . dropPeer ( peer )
2015-06-29 14:20:13 +03:00
default :
// Something went very wrong, drop the peer
2020-08-07 17:06:29 +08:00
log . Error ( "Propagated block verification failed" , "peer" , peer , "number" , block . Number ( ) , "hash" , hash , "err" , err )
2015-06-18 18:00:19 +03:00
f . dropPeer ( peer )
2015-06-17 18:25:23 +03:00
return
}
2015-06-17 16:53:28 +03:00
// Run the actual import and log any issues
2015-06-17 18:25:23 +03:00
if _ , err := f . insertChain ( types . Blocks { block } ) ; err != nil {
2017-03-02 15:06:16 +02:00
log . Debug ( "Propagated block import failed" , "peer" , peer , "number" , block . Number ( ) , "hash" , hash , "err" , err )
2015-06-17 16:53:28 +03:00
return
}
2015-06-17 18:25:23 +03:00
// If import succeeded, broadcast the block
2019-10-28 19:59:07 +08:00
blockAnnounceOutTimer . UpdateSince ( block . ReceivedAt )
2015-06-18 18:00:19 +03:00
go f . broadcastBlock ( block , false )
2015-06-22 18:08:28 +03:00
// Invoke the testing hook if needed
if f . importedHook != nil {
2020-07-28 23:02:35 +08:00
f . importedHook ( nil , block )
2015-06-22 18:08:28 +03:00
}
2015-06-17 16:53:28 +03:00
} ( )
}
2015-06-22 14:07:08 +03:00
2015-06-22 16:49:47 +03:00
// forgetHash removes all traces of a block announcement from the fetcher's
// internal state.
2019-10-28 19:59:07 +08:00
func ( f * BlockFetcher ) forgetHash ( hash common . Hash ) {
2015-06-22 14:07:08 +03:00
// Remove all pending announces and decrement DOS counters
2021-06-30 22:24:17 +02:00
if announceMap , ok := f . announced [ hash ] ; ok {
for _ , announce := range announceMap {
f . announces [ announce . origin ] --
if f . announces [ announce . origin ] <= 0 {
delete ( f . announces , announce . origin )
}
}
delete ( f . announced , hash )
if f . announceChangeHook != nil {
f . announceChangeHook ( hash , false )
2015-06-22 14:07:08 +03:00
}
2015-10-13 12:04:25 +03:00
}
2015-06-22 14:07:08 +03:00
// Remove any pending fetches and decrement the DOS counters
if announce := f . fetching [ hash ] ; announce != nil {
f . announces [ announce . origin ] --
2019-07-08 11:42:22 +02:00
if f . announces [ announce . origin ] <= 0 {
2015-06-22 14:07:08 +03:00
delete ( f . announces , announce . origin )
}
delete ( f . fetching , hash )
}
2015-08-14 21:25:41 +03:00
// Remove any pending completion requests and decrement the DOS counters
for _ , announce := range f . fetched [ hash ] {
f . announces [ announce . origin ] --
2019-07-08 11:42:22 +02:00
if f . announces [ announce . origin ] <= 0 {
2015-08-14 21:25:41 +03:00
delete ( f . announces , announce . origin )
}
}
delete ( f . fetched , hash )
// Remove any pending completions and decrement the DOS counters
if announce := f . completing [ hash ] ; announce != nil {
f . announces [ announce . origin ] --
2019-07-08 11:42:22 +02:00
if f . announces [ announce . origin ] <= 0 {
2015-08-14 21:25:41 +03:00
delete ( f . announces , announce . origin )
}
delete ( f . completing , hash )
}
2015-06-22 14:07:08 +03:00
}
2015-06-22 16:49:47 +03:00
2015-08-14 21:25:41 +03:00
// forgetBlock removes all traces of a queued block from the fetcher's internal
2015-06-22 16:49:47 +03:00
// state.
2019-10-28 19:59:07 +08:00
func ( f * BlockFetcher ) forgetBlock ( hash common . Hash ) {
2015-06-22 16:49:47 +03:00
if insert := f . queued [ hash ] ; insert != nil {
f . queues [ insert . origin ] --
if f . queues [ insert . origin ] == 0 {
delete ( f . queues , insert . origin )
}
delete ( f . queued , hash )
}
}