Merge pull request #15138 from karalabe/statesync-peer-drops

eth/downloader: track peer drops and deassign state sync tasks
This commit is contained in:
Péter Szilágyi 2017-09-12 15:33:32 +03:00 committed by GitHub
commit 382c9266e6
2 changed files with 42 additions and 9 deletions

@ -349,9 +349,10 @@ func (p *peerConnection) Lacks(hash common.Hash) bool {
// peerSet represents the collection of active peer participating in the chain // peerSet represents the collection of active peer participating in the chain
// download procedure. // download procedure.
type peerSet struct { type peerSet struct {
peers map[string]*peerConnection peers map[string]*peerConnection
newPeerFeed event.Feed newPeerFeed event.Feed
lock sync.RWMutex peerDropFeed event.Feed
lock sync.RWMutex
} }
// newPeerSet creates a new peer set top track the active download sources. // newPeerSet creates a new peer set top track the active download sources.
@ -361,10 +362,16 @@ func newPeerSet() *peerSet {
} }
} }
// SubscribeNewPeers subscribes to peer arrival events.
func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription { func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription {
return ps.newPeerFeed.Subscribe(ch) return ps.newPeerFeed.Subscribe(ch)
} }
// SubscribePeerDrops subscribes to peer departure events.
func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription {
return ps.peerDropFeed.Subscribe(ch)
}
// Reset iterates over the current peer set, and resets each of the known peers // Reset iterates over the current peer set, and resets each of the known peers
// to prepare for a next batch of block retrieval. // to prepare for a next batch of block retrieval.
func (ps *peerSet) Reset() { func (ps *peerSet) Reset() {
@ -419,12 +426,15 @@ func (ps *peerSet) Register(p *peerConnection) error {
// actions to/from that particular entity. // actions to/from that particular entity.
func (ps *peerSet) Unregister(id string) error { func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock() ps.lock.Lock()
defer ps.lock.Unlock() p, ok := ps.peers[id]
if !ok {
if _, ok := ps.peers[id]; !ok { defer ps.lock.Unlock()
return errNotRegistered return errNotRegistered
} }
delete(ps.peers, id) delete(ps.peers, id)
ps.lock.Unlock()
ps.peerDropFeed.Send(p)
return nil return nil
} }

@ -40,6 +40,7 @@ type stateReq struct {
timer *time.Timer // Timer to fire when the RTT timeout expires timer *time.Timer // Timer to fire when the RTT timeout expires
peer *peerConnection // Peer that we're requesting from peer *peerConnection // Peer that we're requesting from
response [][]byte // Response data of the peer (nil for timeouts) response [][]byte // Response data of the peer (nil for timeouts)
dropped bool // Flag whether the peer dropped off early
} }
// timedOut returns if this request timed out. // timedOut returns if this request timed out.
@ -105,6 +106,11 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
go s.run() go s.run()
defer s.Cancel() defer s.Cancel()
// Listen for peer departure events to cancel assigned tasks
peerDrop := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
defer peerSub.Unsubscribe()
for { for {
// Enable sending of the first buffered element if there is one. // Enable sending of the first buffered element if there is one.
var ( var (
@ -143,6 +149,20 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
finished = append(finished, req) finished = append(finished, req)
delete(active, pack.PeerId()) delete(active, pack.PeerId())
// Handle dropped peer connections:
case p := <-peerDrop:
// Skip if no request is currently pending
req := active[p.id]
if req == nil {
continue
}
// Finalize the request and queue up for processing
req.timer.Stop()
req.dropped = true
finished = append(finished, req)
delete(active, p.id)
// Handle timed-out requests: // Handle timed-out requests:
case req := <-timeout: case req := <-timeout:
// If the peer is already requesting something else, ignore the stale timeout. // If the peer is already requesting something else, ignore the stale timeout.
@ -167,6 +187,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
// Make sure the previous one doesn't get siletly lost // Make sure the previous one doesn't get siletly lost
old.timer.Stop()
old.dropped = true
finished = append(finished, old) finished = append(finished, old)
} }
// Start a timer to notify the sync loop if the peer stalled. // Start a timer to notify the sync loop if the peer stalled.
@ -269,9 +292,9 @@ func (s *stateSync) loop() error {
return errCancelStateFetch return errCancelStateFetch
case req := <-s.deliver: case req := <-s.deliver:
// Response or timeout triggered, drop the peer if stalling // Response, disconnect or timeout triggered, drop the peer if stalling
log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "timeout", req.timedOut()) log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
if len(req.items) <= 2 && req.timedOut() { if len(req.items) <= 2 && !req.dropped && req.timedOut() {
// 2 items are the minimum requested, if even that times out, we've no use of // 2 items are the minimum requested, if even that times out, we've no use of
// this peer at the moment. // this peer at the moment.
log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)