Merge pull request #1953 from karalabe/switch-to-fast-peers

eth/downloader: fetch data proportionally to peer capacity
This commit is contained in:
Jeffrey Wilcke 2015-11-19 18:48:53 +01:00
commit f16fab91c8
3 changed files with 257 additions and 245 deletions

@ -45,16 +45,17 @@ var (
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request
hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out
blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out
bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@ -486,7 +487,7 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
// Request the advertised remote head block and wait for the response // Request the advertised remote head block and wait for the response
go p.getBlocks([]common.Hash{p.head}) go p.getBlocks([]common.Hash{p.head})
timeout := time.After(blockSoftTTL) timeout := time.After(hashTTL)
for { for {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
@ -779,47 +780,27 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// If the peer was previously banned and failed to deliver it's pack // If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message. // in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil { if peer := d.peers.Peer(packet.PeerId()); peer != nil {
// Deliver the received chunk of blocks, and demote in case of errors
blocks := packet.(*blockPack).blocks blocks := packet.(*blockPack).blocks
err := d.queue.DeliverBlocks(peer.id, blocks)
switch err {
case nil:
// If no blocks were delivered, demote the peer (need the delivery above)
if len(blocks) == 0 {
peer.Demote()
peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
break
}
// All was successful, promote the peer and potentially start processing
peer.Promote()
peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
case errInvalidChain: // Deliver the received chunk of blocks and check chain validity
// The hash chain is invalid (blocks are not ordered properly), abort accepted, err := d.queue.DeliverBlocks(peer.id, blocks)
if err == errInvalidChain {
return err return err
}
case errNoFetchesPending: // Unless a peer delivered something completely else than requested (usually
// Peer probably timed out with its delivery but came through // caused by a timed out request which came through in the end), set it to
// in the end, demote, but allow to to pull from this peer. // idle. If the delivery's stale, the peer should have already been idled.
peer.Demote() if err != errStaleDelivery {
peer.SetBlocksIdle() peer.SetBlocksIdle(accepted)
glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) }
// Issue a log to the user to see what's going on
case errStaleDelivery: switch {
// Delivered something completely else than requested, usually case err == nil && len(blocks) == 0:
// caused by a timeout and delivery during a new sync cycle. glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
// Don't set it to idle as the original request should still be case err == nil:
// in flight. glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
peer.Demote()
glog.V(logger.Detail).Infof("%s: stale delivery", peer)
default: default:
// Peer did something semi-useful, demote but keep it around glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err)
peer.Demote()
peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
} }
} }
// Blocks arrived, try to update the progress // Blocks arrived, try to update the progress
@ -852,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
return errNoPeers return errNoPeers
} }
// Check for block request timeouts and demote the responsible peers // Check for block request timeouts and demote the responsible peers
for _, pid := range d.queue.ExpireBlocks(blockHardTTL) { for pid, fails := range d.queue.ExpireBlocks(blockTTL) {
if peer := d.peers.Peer(pid); peer != nil { if peer := d.peers.Peer(pid); peer != nil {
peer.Demote() if fails > 1 {
glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
peer.SetBlocksIdle(0)
} else {
glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer)
d.dropPeer(pid)
}
} }
} }
// If there's nothing more to fetch, wait or terminate // If there's nothing more to fetch, wait or terminate
@ -1281,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error {
glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
var ( var (
deliver = func(packet dataPack) error { deliver = func(packet dataPack) (int, error) {
pack := packet.(*bodyPack) pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
} }
expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) } expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity() } capacity = func(p *peer) int { return p.BlockCapacity() }
setIdle = func(p *peer) { p.SetBodiesIdle() } setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
) )
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
@ -1305,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error {
glog.V(logger.Debug).Infof("Downloading receipts from #%d", from) glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
var ( var (
deliver = func(packet dataPack) error { deliver = func(packet dataPack) (int, error) {
pack := packet.(*receiptPack) pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts) return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
} }
expire = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) } expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peer) int { return p.ReceiptCapacity() } capacity = func(p *peer) int { return p.ReceiptCapacity() }
setIdle = func(p *peer) { p.SetReceiptsIdle() } setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
) )
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
@ -1329,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error {
glog.V(logger.Debug).Infof("Downloading node state data") glog.V(logger.Debug).Infof("Downloading node state data")
var ( var (
deliver = func(packet dataPack) error { deliver = func(packet dataPack) (int, error) {
start := time.Now() start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
if err != nil { if err != nil {
@ -1352,14 +1338,14 @@ func (d *Downloader) fetchNodeData() error {
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
}) })
} }
expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) } expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
throttle = func() bool { return false } throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) { reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveNodeData(p, count), false, nil return d.queue.ReserveNodeData(p, count), false, nil
} }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
capacity = func(p *peer) int { return p.NodeDataCapacity() } capacity = func(p *peer) int { return p.NodeDataCapacity() }
setIdle = func(p *peer) { p.SetNodeDataIdle() } setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
) )
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
@ -1372,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error {
// fetchParts iteratively downloads scheduled block parts, taking any available // fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and // peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts. // also periodically checking for timeouts.
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool, func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
idle func() ([]*peer, int), setIdle func(*peer), kind string) error { idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
// Create a ticker to detect expired retrieval tasks // Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond) ticker := time.NewTicker(100 * time.Millisecond)
@ -1394,45 +1380,25 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// If the peer was previously banned and failed to deliver it's pack // If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message. // in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil { if peer := d.peers.Peer(packet.PeerId()); peer != nil {
// Deliver the received chunk of data, and demote in case of errors // Deliver the received chunk of data and check chain validity
switch err := deliver(packet); err { accepted, err := deliver(packet)
case nil: if err == errInvalidChain {
// If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
if packet.Items() == 0 {
peer.Demote()
setIdle(peer)
glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
break
}
// All was successful, promote the peer and potentially start processing
peer.Promote()
setIdle(peer)
glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
return err return err
}
case errNoFetchesPending: // Unless a peer delivered something completely else than requested (usually
// Peer probably timed out with its delivery but came through // caused by a timed out request which came through in the end), set it to
// in the end, demote, but allow to to pull from this peer. // idle. If the delivery's stale, the peer should have already been idled.
peer.Demote() if err != errStaleDelivery {
setIdle(peer) setIdle(peer, accepted)
glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind)) }
// Issue a log to the user to see what's going on
case errStaleDelivery: switch {
// Delivered something completely else than requested, usually case err == nil && packet.Items() == 0:
// caused by a timeout and delivery during a new sync cycle. glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
// Don't set it to idle as the original request should still be case err == nil:
// in flight. glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
peer.Demote()
glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind))
default: default:
// Peer did something semi-useful, demote but keep it around glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
peer.Demote()
setIdle(peer)
glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
} }
} }
// Blocks assembled, try to update the progress // Blocks assembled, try to update the progress
@ -1465,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
return errNoPeers return errNoPeers
} }
// Check for fetch request timeouts and demote the responsible peers // Check for fetch request timeouts and demote the responsible peers
for _, pid := range expire() { for pid, fails := range expire() {
if peer := d.peers.Peer(pid); peer != nil { if peer := d.peers.Peer(pid); peer != nil {
peer.Demote() if fails > 1 {
setIdle(peer) glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) setIdle(peer, 0)
} else {
glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
d.dropPeer(pid)
}
} }
} }
// If there's nothing more to fetch, wait or terminate // If there's nothing more to fetch, wait or terminate

@ -30,8 +30,10 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
) )
// Maximum number of entries allowed on the list or lacking items. const (
const maxLackingHashes = 4096 maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
)
// Hash and block fetchers belonging to eth/61 and below // Hash and block fetchers belonging to eth/61 and below
type relativeHashFetcherFn func(common.Hash) error type relativeHashFetcherFn func(common.Hash) error
@ -59,18 +61,16 @@ type peer struct {
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
rep int32 // Simple peer reputation
blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second
receiptCapacity int32 // Number of receipts allowed to fetch per request receiptThroughput float64 // Number of receipts measured to be retrievable per second
stateCapacity int32 // Number of node data pieces allowed to fetch per request stateThroughput float64 // Number of node data pieces measured to be retrievable per second
blockStarted time.Time // Time instance when the last block (body)fetch was started blockStarted time.Time // Time instance when the last block (body)fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started stateStarted time.Time // Time instance when the last node data fetch was started
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
lackingLock sync.RWMutex // Lock protecting the lacking hashes list
getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
@ -84,6 +84,7 @@ type peer struct {
getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
version int // Eth protocol version number to switch strategies version int // Eth protocol version number to switch strategies
lock sync.RWMutex
} }
// newPeer create a new downloader peer, with specific hash and block retrieval // newPeer create a new downloader peer, with specific hash and block retrieval
@ -93,12 +94,9 @@ func newPeer(id string, version int, head common.Hash,
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
return &peer{ return &peer{
id: id, id: id,
head: head, head: head,
blockCapacity: 1, lacking: make(map[common.Hash]struct{}),
receiptCapacity: 1,
stateCapacity: 1,
lacking: make(map[common.Hash]struct{}),
getRelHashes: getRelHashes, getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes, getAbsHashes: getAbsHashes,
@ -117,15 +115,18 @@ func newPeer(id string, version int, head common.Hash,
// Reset clears the internal state of a peer entity. // Reset clears the internal state of a peer entity.
func (p *peer) Reset() { func (p *peer) Reset() {
p.lock.Lock()
defer p.lock.Unlock()
atomic.StoreInt32(&p.blockIdle, 0) atomic.StoreInt32(&p.blockIdle, 0)
atomic.StoreInt32(&p.receiptIdle, 0) atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.blockCapacity, 1) atomic.StoreInt32(&p.stateIdle, 0)
atomic.StoreInt32(&p.receiptCapacity, 1)
atomic.StoreInt32(&p.stateCapacity, 1) p.blockThroughput = 0
p.receiptThroughput = 0
p.stateThroughput = 0
p.lackingLock.Lock()
p.lacking = make(map[common.Hash]struct{}) p.lacking = make(map[common.Hash]struct{})
p.lackingLock.Unlock()
} }
// Fetch61 sends a block retrieval request to the remote peer. // Fetch61 sends a block retrieval request to the remote peer.
@ -216,107 +217,86 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return nil return nil
} }
// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. // SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
// Its block retrieval allowance will also be updated either up- or downwards, // requests. Its estimated block retrieval throughput is updated with that measured
// depending on whether the previous fetch completed in time. // just now.
func (p *peer) SetBlocksIdle() { func (p *peer) SetBlocksIdle(delivered int) {
p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle) p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
} }
// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests. // SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
// Its block body retrieval allowance will also be updated either up- or downwards, // requests. Its estimated body retrieval throughput is updated with that measured
// depending on whether the previous fetch completed in time. // just now.
func (p *peer) SetBodiesIdle() { func (p *peer) SetBodiesIdle(delivered int) {
p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBodyFetch, &p.blockCapacity, &p.blockIdle) p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
} }
// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests. // SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
// Its receipt retrieval allowance will also be updated either up- or downwards, // retrieval requests. Its estimated receipt retrieval throughput is updated
// depending on whether the previous fetch completed in time. // with that measured just now.
func (p *peer) SetReceiptsIdle() { func (p *peer) SetReceiptsIdle(delivered int) {
p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
} }
// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval // SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
// requests. Its node data retrieval allowance will also be updated either up- or // data retrieval requests. Its estimated state retrieval throughput is updated
// downwards, depending on whether the previous fetch completed in time. // with that measured just now.
func (p *peer) SetNodeDataIdle() { func (p *peer) SetNodeDataIdle(delivered int) {
p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle) p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
} }
// setIdle sets the peer to idle, allowing it to execute new retrieval requests. // setIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its data retrieval allowance will also be updated either up- or downwards, // Its estimated retrieval throughput is updated with that measured just now.
// depending on whether the previous fetch completed in time. func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) { // Irrelevant of the scaling, make sure the peer ends up idle
// Update the peer's download allowance based on previous performance defer atomic.StoreInt32(idle, 0)
scale := 2.0
if time.Since(started) > softTTL {
scale = 0.5
if time.Since(started) > hardTTL {
scale = 1 / float64(maxFetch) // reduces capacity to 1
}
}
for {
// Calculate the new download bandwidth allowance
prev := atomic.LoadInt32(capacity)
next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale)))
// Try to update the old value p.lock.RLock()
if atomic.CompareAndSwapInt32(capacity, prev, next) { defer p.lock.RUnlock()
// If we're having problems at 1 capacity, try to find better peers
if next == 1 { // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum
p.Demote() if delivered == 0 {
} *throughput = 0
break return
}
} }
// Set the peer to idle to allow further fetch requests // Otherwise update the throughput with a new measurement
atomic.StoreInt32(idle, 0) measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor
*throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured
} }
// BlockCapacity retrieves the peers block download allowance based on its // BlockCapacity retrieves the peers block download allowance based on its
// previously discovered bandwidth capacity. // previously discovered throughput.
func (p *peer) BlockCapacity() int { func (p *peer) BlockCapacity() int {
return int(atomic.LoadInt32(&p.blockCapacity)) p.lock.RLock()
defer p.lock.RUnlock()
return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch))))
} }
// ReceiptCapacity retrieves the peers block download allowance based on its // ReceiptCapacity retrieves the peers receipt download allowance based on its
// previously discovered bandwidth capacity. // previously discovered throughput.
func (p *peer) ReceiptCapacity() int { func (p *peer) ReceiptCapacity() int {
return int(atomic.LoadInt32(&p.receiptCapacity)) p.lock.RLock()
defer p.lock.RUnlock()
return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch))))
} }
// NodeDataCapacity retrieves the peers block download allowance based on its // NodeDataCapacity retrieves the peers state download allowance based on its
// previously discovered bandwidth capacity. // previously discovered throughput.
func (p *peer) NodeDataCapacity() int { func (p *peer) NodeDataCapacity() int {
return int(atomic.LoadInt32(&p.stateCapacity)) p.lock.RLock()
} defer p.lock.RUnlock()
// Promote increases the peer's reputation. return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch))))
func (p *peer) Promote() {
atomic.AddInt32(&p.rep, 1)
}
// Demote decreases the peer's reputation or leaves it at 0.
func (p *peer) Demote() {
for {
// Calculate the new reputation value
prev := atomic.LoadInt32(&p.rep)
next := prev / 2
// Try to update the old value
if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
return
}
}
} }
// MarkLacking appends a new entity to the set of items (blocks, receipts, states) // MarkLacking appends a new entity to the set of items (blocks, receipts, states)
// that a peer is known not to have (i.e. have been requested before). If the // that a peer is known not to have (i.e. have been requested before). If the
// set reaches its maximum allowed capacity, items are randomly dropped off. // set reaches its maximum allowed capacity, items are randomly dropped off.
func (p *peer) MarkLacking(hash common.Hash) { func (p *peer) MarkLacking(hash common.Hash) {
p.lackingLock.Lock() p.lock.Lock()
defer p.lackingLock.Unlock() defer p.lock.Unlock()
for len(p.lacking) >= maxLackingHashes { for len(p.lacking) >= maxLackingHashes {
for drop, _ := range p.lacking { for drop, _ := range p.lacking {
@ -330,8 +310,8 @@ func (p *peer) MarkLacking(hash common.Hash) {
// Lacks retrieves whether the hash of a blockchain item is on the peers lacking // Lacks retrieves whether the hash of a blockchain item is on the peers lacking
// list (i.e. whether we know that the peer does not have it). // list (i.e. whether we know that the peer does not have it).
func (p *peer) Lacks(hash common.Hash) bool { func (p *peer) Lacks(hash common.Hash) bool {
p.lackingLock.RLock() p.lock.RLock()
defer p.lackingLock.RUnlock() defer p.lock.RUnlock()
_, ok := p.lacking[hash] _, ok := p.lacking[hash]
return ok return ok
@ -339,13 +319,13 @@ func (p *peer) Lacks(hash common.Hash) bool {
// String implements fmt.Stringer. // String implements fmt.Stringer.
func (p *peer) String() string { func (p *peer) String() string {
p.lackingLock.RLock() p.lock.RLock()
defer p.lackingLock.RUnlock() defer p.lock.RUnlock()
return fmt.Sprintf("Peer %s [%s]", p.id, return fmt.Sprintf("Peer %s [%s]", p.id,
fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+ fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+
fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+ fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+
fmt.Sprintf("lacking %4d", len(p.lacking)), fmt.Sprintf("lacking %4d", len(p.lacking)),
) )
} }
@ -377,6 +357,10 @@ func (ps *peerSet) Reset() {
// Register injects a new peer into the working set, or returns an error if the // Register injects a new peer into the working set, or returns an error if the
// peer is already known. // peer is already known.
//
// The method also sets the starting throughput values of the new peer to the
// average of all existing peers, to give it a realistic change of being used
// for data retrievals.
func (ps *peerSet) Register(p *peer) error { func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock() ps.lock.Lock()
defer ps.lock.Unlock() defer ps.lock.Unlock()
@ -384,6 +368,20 @@ func (ps *peerSet) Register(p *peer) error {
if _, ok := ps.peers[p.id]; ok { if _, ok := ps.peers[p.id]; ok {
return errAlreadyRegistered return errAlreadyRegistered
} }
if len(ps.peers) > 0 {
p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0
for _, peer := range ps.peers {
peer.lock.RLock()
p.blockThroughput += peer.blockThroughput
p.receiptThroughput += peer.receiptThroughput
p.stateThroughput += peer.stateThroughput
peer.lock.RUnlock()
}
p.blockThroughput /= float64(len(ps.peers))
p.receiptThroughput /= float64(len(ps.peers))
p.stateThroughput /= float64(len(ps.peers))
}
ps.peers[p.id] = p ps.peers[p.id] = p
return nil return nil
} }
@ -435,7 +433,12 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool { idle := func(p *peer) bool {
return atomic.LoadInt32(&p.blockIdle) == 0 return atomic.LoadInt32(&p.blockIdle) == 0
} }
return ps.idlePeers(61, 61, idle) throughput := func(p *peer) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.blockThroughput
}
return ps.idlePeers(61, 61, idle, throughput)
} }
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
@ -444,7 +447,12 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool { idle := func(p *peer) bool {
return atomic.LoadInt32(&p.blockIdle) == 0 return atomic.LoadInt32(&p.blockIdle) == 0
} }
return ps.idlePeers(62, 64, idle) throughput := func(p *peer) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.blockThroughput
}
return ps.idlePeers(62, 64, idle, throughput)
} }
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
@ -453,7 +461,12 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool { idle := func(p *peer) bool {
return atomic.LoadInt32(&p.receiptIdle) == 0 return atomic.LoadInt32(&p.receiptIdle) == 0
} }
return ps.idlePeers(63, 64, idle) throughput := func(p *peer) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.receiptThroughput
}
return ps.idlePeers(63, 64, idle, throughput)
} }
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
@ -462,12 +475,18 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool { idle := func(p *peer) bool {
return atomic.LoadInt32(&p.stateIdle) == 0 return atomic.LoadInt32(&p.stateIdle) == 0
} }
return ps.idlePeers(63, 64, idle) throughput := func(p *peer) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.stateThroughput
}
return ps.idlePeers(63, 64, idle, throughput)
} }
// idlePeers retrieves a flat list of all currently idle peers satisfying the // idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness. // protocol version constraints, using the provided function to check idleness.
func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) { // The resulting set of peers are sorted by their measure throughput.
func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) {
ps.lock.RLock() ps.lock.RLock()
defer ps.lock.RUnlock() defer ps.lock.RUnlock()
@ -482,7 +501,7 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer)
} }
for i := 0; i < len(idle); i++ { for i := 0; i < len(idle); i++ {
for j := i + 1; j < len(idle); j++ { for j := i + 1; j < len(idle); j++ {
if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { if throughput(idle[i]) < throughput(idle[j]) {
idle[i], idle[j] = idle[j], idle[i] idle[i], idle[j] = idle[j], idle[i]
} }
} }

@ -703,7 +703,7 @@ func (q *queue) Revoke(peerId string) {
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance, // ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation. // canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBlocks(timeout time.Duration) []string { func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -712,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string {
// ExpireBodies checks for in flight block body requests that exceeded a timeout // ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation. // allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBodies(timeout time.Duration) []string { func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -721,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string {
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout // ExpireReceipts checks for in flight receipt requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation. // allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireReceipts(timeout time.Duration) []string { func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -730,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string {
// ExpireNodeData checks for in flight node data requests that exceeded a timeout // ExpireNodeData checks for in flight node data requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation. // allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireNodeData(timeout time.Duration) []string { func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -743,9 +743,9 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
// Note, this method expects the queue lock to be already held. The // Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need // reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway. // to access the queue, so they already need a lock anyway.
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
// Iterate over the expired requests and return each to the queue // Iterate over the expired requests and return each to the queue
peers := []string{} expiries := make(map[string]int)
for id, request := range pendPool { for id, request := range pendPool {
if time.Since(request.Time) > timeout { if time.Since(request.Time) > timeout {
// Update the metrics with the timeout // Update the metrics with the timeout
@ -758,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
for _, header := range request.Headers { for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64())) taskQueue.Push(header, -float32(header.Number.Uint64()))
} }
peers = append(peers, id) // Add the peer to the expiry report along the the number of failed requests
expirations := len(request.Hashes)
if expirations < len(request.Headers) {
expirations = len(request.Headers)
}
expiries[id] = expirations
} }
} }
// Remove the expired requests from the pending pool // Remove the expired requests from the pending pool
for _, id := range peers { for id, _ := range expiries {
delete(pendPool, id) delete(pendPool, id)
} }
return peers return expiries
} }
// DeliverBlocks injects a block retrieval response into the download queue. // DeliverBlocks injects a block retrieval response into the download queue. The
func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { // method returns the number of blocks accepted from the delivery and also wakes
// any threads waiting for data delivery.
func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Short circuit if the blocks were never requested // Short circuit if the blocks were never requested
request := q.blockPendPool[id] request := q.blockPendPool[id]
if request == nil { if request == nil {
return errNoFetchesPending return 0, errNoFetchesPending
} }
blockReqTimer.UpdateSince(request.Time) blockReqTimer.UpdateSince(request.Time)
delete(q.blockPendPool, id) delete(q.blockPendPool, id)
@ -788,7 +795,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
} }
} }
// Iterate over the downloaded blocks and add each of them // Iterate over the downloaded blocks and add each of them
errs := make([]error, 0) accepted, errs := 0, make([]error, 0)
for _, block := range blocks { for _, block := range blocks {
// Skip any blocks that were not requested // Skip any blocks that were not requested
hash := block.Hash() hash := block.Hash()
@ -811,28 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
delete(request.Hashes, hash) delete(request.Hashes, hash)
delete(q.hashPool, hash) delete(q.hashPool, hash)
accepted++
} }
// Return all failed or missing fetches to the queue // Return all failed or missing fetches to the queue
for hash, index := range request.Hashes { for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index)) q.hashQueue.Push(hash, float32(index))
} }
// Wake up WaitResults // Wake up WaitResults
q.active.Signal() if accepted > 0 {
q.active.Signal()
}
// If none of the blocks were good, it's a stale delivery // If none of the blocks were good, it's a stale delivery
switch { switch {
case len(errs) == 0: case len(errs) == 0:
return nil return accepted, nil
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
return errs[0] return accepted, errs[0]
case len(errs) == len(blocks): case len(errs) == len(blocks):
return errStaleDelivery return accepted, errStaleDelivery
default: default:
return fmt.Errorf("multiple failures: %v", errs) return accepted, fmt.Errorf("multiple failures: %v", errs)
} }
} }
// DeliverBodies injects a block body retrieval response into the results queue. // DeliverBodies injects a block body retrieval response into the results queue.
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { // The method returns the number of blocks bodies accepted from the delivery and
// also wakes any threads waiting for data delivery.
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -848,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
} }
// DeliverReceipts injects a receipt retrieval response into the results queue. // DeliverReceipts injects a receipt retrieval response into the results queue.
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error { // The method returns the number of transaction receipts accepted from the delivery
// and also wakes any threads waiting for data delivery.
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -867,12 +881,14 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error
// Note, this method expects the queue lock to be already held for writing. The // Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need // reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway. // to access the queue, so they already need a lock anyway.
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error { pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
// Short circuit if the data was never requested // Short circuit if the data was never requested
request := pendPool[id] request := pendPool[id]
if request == nil { if request == nil {
return errNoFetchesPending return 0, errNoFetchesPending
} }
reqTimer.UpdateSince(request.Time) reqTimer.UpdateSince(request.Time)
delete(pendPool, id) delete(pendPool, id)
@ -885,8 +901,9 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
} }
// Assemble each of the results with their headers and retrieved data parts // Assemble each of the results with their headers and retrieved data parts
var ( var (
failure error accepted int
useful bool failure error
useful bool
) )
for i, header := range request.Headers { for i, header := range request.Headers {
// Short circuit assembly if no more fetch results are found // Short circuit assembly if no more fetch results are found
@ -906,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
donePool[header.Hash()] = struct{}{} donePool[header.Hash()] = struct{}{}
q.resultCache[index].Pending-- q.resultCache[index].Pending--
useful = true useful = true
accepted++
// Clean up a successful fetch // Clean up a successful fetch
request.Headers[i] = nil request.Headers[i] = nil
@ -918,27 +936,31 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
} }
} }
// Wake up WaitResults // Wake up WaitResults
q.active.Signal() if accepted > 0 {
q.active.Signal()
}
// If none of the data was good, it's a stale delivery // If none of the data was good, it's a stale delivery
switch { switch {
case failure == nil || failure == errInvalidChain: case failure == nil || failure == errInvalidChain:
return failure return accepted, failure
case useful: case useful:
return fmt.Errorf("partial failure: %v", failure) return accepted, fmt.Errorf("partial failure: %v", failure)
default: default:
return errStaleDelivery return accepted, errStaleDelivery
} }
} }
// DeliverNodeData injects a node state data retrieval response into the queue. // DeliverNodeData injects a node state data retrieval response into the queue.
func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error { // The method returns the number of node state entries originally requested, and
// the number of them actually accepted from the delivery.
func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Short circuit if the data was never requested // Short circuit if the data was never requested
request := q.statePendPool[id] request := q.statePendPool[id]
if request == nil { if request == nil {
return errNoFetchesPending return 0, errNoFetchesPending
} }
stateReqTimer.UpdateSince(request.Time) stateReqTimer.UpdateSince(request.Time)
delete(q.statePendPool, id) delete(q.statePendPool, id)
@ -950,10 +972,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
} }
} }
// Iterate over the downloaded data and verify each of them // Iterate over the downloaded data and verify each of them
errs := make([]error, 0) accepted, errs := 0, make([]error, 0)
process := []trie.SyncResult{} process := []trie.SyncResult{}
for _, blob := range data { for _, blob := range data {
// Skip any blocks that were not requested // Skip any state trie entires that were not requested
hash := common.BytesToHash(crypto.Sha3(blob)) hash := common.BytesToHash(crypto.Sha3(blob))
if _, ok := request.Hashes[hash]; !ok { if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
@ -961,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
} }
// Inject the next state trie item into the processing queue // Inject the next state trie item into the processing queue
process = append(process, trie.SyncResult{hash, blob}) process = append(process, trie.SyncResult{hash, blob})
accepted++
delete(request.Hashes, hash) delete(request.Hashes, hash)
delete(q.stateTaskPool, hash) delete(q.stateTaskPool, hash)
@ -978,11 +1001,11 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// If none of the data items were good, it's a stale delivery // If none of the data items were good, it's a stale delivery
switch { switch {
case len(errs) == 0: case len(errs) == 0:
return nil return accepted, nil
case len(errs) == len(request.Hashes): case len(errs) == len(request.Hashes):
return errStaleDelivery return accepted, errStaleDelivery
default: default:
return fmt.Errorf("multiple failures: %v", errs) return accepted, fmt.Errorf("multiple failures: %v", errs)
} }
} }