528d97b541
fix: lint error fix: ut error fix: code review comments
389 lines
14 KiB
Go
389 lines
14 KiB
Go
// Copyright 2021 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package downloader
|
|
|
|
import (
|
|
"errors"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/common/prque"
|
|
"github.com/ethereum/go-ethereum/eth/protocols/eth"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
)
|
|
|
|
// timeoutGracePeriod is the amount of time to allow for a peer to deliver a
|
|
// response to a locally already timed out request. Timeouts are not penalized
|
|
// as a peer might be temporarily overloaded, however, they still must reply
|
|
// to each request. Failing to do so is considered a protocol violation.
|
|
var timeoutGracePeriod = 2 * time.Minute
|
|
|
|
// typedQueue is an interface defining the adaptor needed to translate the type
|
|
// specific downloader/queue schedulers into the type-agnostic general concurrent
|
|
// fetcher algorithm calls.
|
|
type typedQueue interface {
|
|
// waker returns a notification channel that gets pinged in case more fetches
|
|
// have been queued up, so the fetcher might assign it to idle peers.
|
|
waker() chan bool
|
|
|
|
// pending returns the number of wrapped items that are currently queued for
|
|
// fetching by the concurrent downloader.
|
|
pending() int
|
|
|
|
// capacity is responsible for calculating how many items of the abstracted
|
|
// type a particular peer is estimated to be able to retrieve within the
|
|
// allotted round trip time.
|
|
capacity(peer *peerConnection, rtt time.Duration) int
|
|
|
|
// updateCapacity is responsible for updating how many items of the abstracted
|
|
// type a particular peer is estimated to be able to retrieve in a unit time.
|
|
updateCapacity(peer *peerConnection, items int, elapsed time.Duration)
|
|
|
|
// reserve is responsible for allocating a requested number of pending items
|
|
// from the download queue to the specified peer.
|
|
reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool)
|
|
|
|
// unreserve is responsible for removing the current retrieval allocation
|
|
// assigned to a specific peer and placing it back into the pool to allow
|
|
// reassigning to some other peer.
|
|
unreserve(peer string) int
|
|
|
|
// request is responsible for converting a generic fetch request into a typed
|
|
// one and sending it to the remote peer for fulfillment.
|
|
request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error)
|
|
|
|
// deliver is responsible for taking a generic response packet from the
|
|
// concurrent fetcher, unpacking the type specific data and delivering
|
|
// it to the downloader's queue.
|
|
deliver(peer *peerConnection, packet *eth.Response) (int, error)
|
|
}
|
|
|
|
// concurrentFetch iteratively downloads scheduled block parts, taking available
|
|
// peers, reserving a chunk of fetch requests for each and waiting for delivery
|
|
// or timeouts.
|
|
func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
|
|
// Create a delivery channel to accept responses from all peers
|
|
responses := make(chan *eth.Response)
|
|
|
|
// Track the currently active requests and their timeout order
|
|
pending := make(map[string]*eth.Request)
|
|
defer func() {
|
|
// Abort all requests on sync cycle cancellation. The requests may still
|
|
// be fulfilled by the remote side, but the dispatcher will not wait to
|
|
// deliver them since nobody's going to be listening.
|
|
for _, req := range pending {
|
|
req.Close()
|
|
}
|
|
}()
|
|
ordering := make(map[*eth.Request]int)
|
|
timeouts := prque.New[int64, *eth.Request](func(data *eth.Request, index int) {
|
|
if index < 0 {
|
|
delete(ordering, data)
|
|
return
|
|
}
|
|
ordering[data] = index
|
|
})
|
|
|
|
timeout := time.NewTimer(0)
|
|
if !timeout.Stop() {
|
|
<-timeout.C
|
|
}
|
|
defer timeout.Stop()
|
|
|
|
// Track the timed-out but not-yet-answered requests separately. We want to
|
|
// keep tracking which peers are busy (potentially overloaded), so removing
|
|
// all trace of a timed out request is not good. We also can't just cancel
|
|
// the pending request altogether as that would prevent a late response from
|
|
// being delivered, thus never unblocking the peer.
|
|
stales := make(map[string]*eth.Request)
|
|
defer func() {
|
|
// Abort all requests on sync cycle cancellation. The requests may still
|
|
// be fulfilled by the remote side, but the dispatcher will not wait to
|
|
// deliver them since nobody's going to be listening.
|
|
for _, req := range stales {
|
|
req.Close()
|
|
}
|
|
}()
|
|
// Subscribe to peer lifecycle events to schedule tasks to new joiners and
|
|
// reschedule tasks upon disconnections. We don't care which event happened
|
|
// for simplicity, so just use a single channel.
|
|
peering := make(chan *peeringEvent, 64) // arbitrary buffer, just some burst protection
|
|
|
|
peeringSub := d.peers.SubscribeEvents(peering)
|
|
defer peeringSub.Unsubscribe()
|
|
|
|
// Prepare the queue and fetch block parts until the block header fetcher's done
|
|
finished := false
|
|
for {
|
|
// Short circuit if we lost all our peers
|
|
if d.peers.Len() == 0 && !beaconMode {
|
|
return errNoPeers
|
|
}
|
|
// If there's nothing more to fetch, wait or terminate
|
|
if queue.pending() == 0 {
|
|
if len(pending) == 0 && finished {
|
|
return nil
|
|
}
|
|
} else {
|
|
// Send a download request to all idle peers, until throttled
|
|
var (
|
|
idles []*peerConnection
|
|
caps []int
|
|
)
|
|
for _, peer := range d.peers.AllPeers() {
|
|
pending, stale := pending[peer.id], stales[peer.id]
|
|
if pending == nil && stale == nil {
|
|
idles = append(idles, peer)
|
|
caps = append(caps, queue.capacity(peer, time.Second))
|
|
} else if stale != nil {
|
|
if waited := time.Since(stale.Sent); waited > timeoutGracePeriod {
|
|
// Request has been in flight longer than the grace period
|
|
// permitted it, consider the peer malicious attempting to
|
|
// stall the sync.
|
|
peer.log.Warn("Peer stalling, dropping", "waited", common.PrettyDuration(waited))
|
|
d.dropPeer(peer.id)
|
|
}
|
|
}
|
|
}
|
|
sort.Sort(&peerCapacitySort{idles, caps})
|
|
|
|
var (
|
|
progressed bool
|
|
throttled bool
|
|
queued = queue.pending()
|
|
)
|
|
for _, peer := range idles {
|
|
// Short circuit if throttling activated or there are no more
|
|
// queued tasks to be retrieved
|
|
if throttled {
|
|
break
|
|
}
|
|
if queued = queue.pending(); queued == 0 {
|
|
break
|
|
}
|
|
// Reserve a chunk of fetches for a peer. A nil can mean either that
|
|
// no more headers are available, or that the peer is known not to
|
|
// have them.
|
|
request, progress, throttle := queue.reserve(peer, queue.capacity(peer, d.peers.rates.TargetRoundTrip()))
|
|
if progress {
|
|
progressed = true
|
|
}
|
|
if throttle {
|
|
throttled = true
|
|
throttleCounter.Inc(1)
|
|
}
|
|
if request == nil {
|
|
continue
|
|
}
|
|
// Fetch the chunk and make sure any errors return the hashes to the queue
|
|
req, err := queue.request(peer, request, responses)
|
|
if err != nil {
|
|
// Sending the request failed, which generally means the peer
|
|
// was disconnected in between assignment and network send.
|
|
// Although all peer removal operations return allocated tasks
|
|
// to the queue, that is async, and we can do better here by
|
|
// immediately pushing the unfulfilled requests.
|
|
queue.unreserve(peer.id) // TODO(karalabe): This needs a non-expiration method
|
|
continue
|
|
}
|
|
pending[peer.id] = req
|
|
|
|
ttl := d.peers.rates.TargetTimeout()
|
|
ordering[req] = timeouts.Size()
|
|
|
|
timeouts.Push(req, -time.Now().Add(ttl).UnixNano())
|
|
if timeouts.Size() == 1 {
|
|
timeout.Reset(ttl)
|
|
}
|
|
}
|
|
// Make sure that we have peers available for fetching. If all peers have been tried
|
|
// and all failed throw an error
|
|
if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode {
|
|
return errPeersUnavailable
|
|
}
|
|
}
|
|
// Wait for something to happen
|
|
select {
|
|
case <-d.cancelCh:
|
|
// If sync was cancelled, tear down the parallel retriever. Pending
|
|
// requests will be cancelled locally, and the remote responses will
|
|
// be dropped when they arrive
|
|
return errCanceled
|
|
|
|
case event := <-peering:
|
|
// A peer joined or left, the tasks queue and allocations need to be
|
|
// checked for potential assignment or reassignment
|
|
peerid := event.peer.id
|
|
|
|
if event.join {
|
|
// Sanity check the internal state; this can be dropped later
|
|
if _, ok := pending[peerid]; ok {
|
|
event.peer.log.Error("Pending request exists for joining peer")
|
|
}
|
|
if _, ok := stales[peerid]; ok {
|
|
event.peer.log.Error("Stale request exists for joining peer")
|
|
}
|
|
// Loop back to the entry point for task assignment
|
|
continue
|
|
}
|
|
// A peer left, any existing requests need to be untracked, pending
|
|
// tasks returned and possible reassignment checked
|
|
if req, ok := pending[peerid]; ok {
|
|
queue.unreserve(peerid) // TODO(karalabe): This needs a non-expiration method
|
|
delete(pending, peerid)
|
|
req.Close()
|
|
|
|
if index, live := ordering[req]; live {
|
|
if index >= 0 && index < timeouts.Size() {
|
|
timeouts.Remove(index)
|
|
if index == 0 {
|
|
if !timeout.Stop() {
|
|
<-timeout.C
|
|
}
|
|
if timeouts.Size() > 0 {
|
|
_, exp := timeouts.Peek()
|
|
timeout.Reset(time.Until(time.Unix(0, -exp)))
|
|
}
|
|
}
|
|
}
|
|
delete(ordering, req)
|
|
}
|
|
}
|
|
if req, ok := stales[peerid]; ok {
|
|
delete(stales, peerid)
|
|
req.Close()
|
|
}
|
|
|
|
case <-timeout.C:
|
|
// Retrieve the next request which should have timed out. The check
|
|
// below is purely for to catch programming errors, given the correct
|
|
// code, there's no possible order of events that should result in a
|
|
// timeout firing for a non-existent event.
|
|
req, exp := timeouts.Peek()
|
|
if now, at := time.Now(), time.Unix(0, -exp); now.Before(at) {
|
|
log.Error("Timeout triggered but not reached", "left", at.Sub(now))
|
|
timeout.Reset(at.Sub(now))
|
|
continue
|
|
}
|
|
// Stop tracking the timed out request from a timing perspective,
|
|
// cancel it, so it's not considered in-flight anymore, but keep
|
|
// the peer marked busy to prevent assigning a second request and
|
|
// overloading it further.
|
|
delete(pending, req.Peer)
|
|
stales[req.Peer] = req
|
|
|
|
timeouts.Pop() // Popping an item will reorder indices in `ordering`, delete after, otherwise will resurrect!
|
|
if timeouts.Size() > 0 {
|
|
_, exp := timeouts.Peek()
|
|
timeout.Reset(time.Until(time.Unix(0, -exp)))
|
|
}
|
|
delete(ordering, req)
|
|
|
|
// New timeout potentially set if there are more requests pending,
|
|
// reschedule the failed one to a free peer
|
|
fails := queue.unreserve(req.Peer)
|
|
|
|
// Finally, update the peer's retrieval capacity, or if it's already
|
|
// below the minimum allowance, drop the peer. If a lot of retrieval
|
|
// elements expired, we might have overestimated the remote peer or
|
|
// perhaps ourselves. Only reset to minimal throughput but don't drop
|
|
// just yet.
|
|
//
|
|
// The reason the minimum threshold is 2 is that the downloader tries
|
|
// to estimate the bandwidth and latency of a peer separately, which
|
|
// requires pushing the measured capacity a bit and seeing how response
|
|
// times reacts, to it always requests one more than the minimum (i.e.
|
|
// min 2).
|
|
peer := d.peers.Peer(req.Peer)
|
|
if peer == nil {
|
|
// If the peer got disconnected in between, we should really have
|
|
// short-circuited it already. Just in case there's some strange
|
|
// codepath, leave this check in not to crash.
|
|
log.Error("Delivery timeout from unknown peer", "peer", req.Peer)
|
|
continue
|
|
}
|
|
if fails > 2 {
|
|
queue.updateCapacity(peer, 0, 0)
|
|
} else {
|
|
d.dropPeer(peer.id)
|
|
|
|
// If this peer was the master peer, abort sync immediately
|
|
d.cancelLock.RLock()
|
|
master := peer.id == d.cancelPeer
|
|
d.cancelLock.RUnlock()
|
|
|
|
if master {
|
|
d.cancel()
|
|
return errTimeout
|
|
}
|
|
}
|
|
|
|
case res := <-responses:
|
|
// Response arrived, it may be for an existing or an already timed
|
|
// out request. If the former, update the timeout heap and perhaps
|
|
// reschedule the timeout timer.
|
|
index, live := ordering[res.Req]
|
|
if live {
|
|
if index >= 0 && index < timeouts.Size() {
|
|
timeouts.Remove(index)
|
|
if index == 0 {
|
|
if !timeout.Stop() {
|
|
<-timeout.C
|
|
}
|
|
if timeouts.Size() > 0 {
|
|
_, exp := timeouts.Peek()
|
|
timeout.Reset(time.Until(time.Unix(0, -exp)))
|
|
}
|
|
}
|
|
}
|
|
delete(ordering, res.Req)
|
|
}
|
|
// Delete the pending request (if it still exists) and mark the peer idle
|
|
delete(pending, res.Req.Peer)
|
|
delete(stales, res.Req.Peer)
|
|
|
|
// Signal the dispatcher that the round trip is done. We'll drop the
|
|
// peer if the data turns out to be junk.
|
|
res.Done <- nil
|
|
res.Req.Close()
|
|
|
|
// If the peer was previously banned and failed to deliver its pack
|
|
// in a reasonable time frame, ignore its message.
|
|
if peer := d.peers.Peer(res.Req.Peer); peer != nil {
|
|
// Deliver the received chunk of data and check chain validity
|
|
accepted, err := queue.deliver(peer, res)
|
|
if errors.Is(err, errInvalidChain) {
|
|
return err
|
|
}
|
|
// Unless a peer delivered something completely else than requested (usually
|
|
// caused by a timed out request which came through in the end), set it to
|
|
// idle. If the delivery's stale, the peer should have already been idled.
|
|
if !errors.Is(err, errStaleDelivery) {
|
|
queue.updateCapacity(peer, accepted, res.Time)
|
|
}
|
|
}
|
|
|
|
case cont := <-queue.waker():
|
|
// The header fetcher sent a continuation flag, check if it's done
|
|
if !cont {
|
|
finished = true
|
|
}
|
|
}
|
|
}
|
|
}
|