From 28c32d1b1bc9ed687713aa4de4eaab38d2a4cb08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Jun 2015 14:51:14 +0300 Subject: [PATCH 1/3] eth/downloader: fix #1178, don't request blocks beyond the cache bounds --- eth/downloader/downloader.go | 38 +++++++++++++++++++++++------------- eth/downloader/peer.go | 10 ++++++++++ eth/downloader/queue.go | 2 +- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f52a97610b..806f60f1b8 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -281,19 +281,19 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes if hashPack.peerId != active.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId) + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) break } timeout.Reset(hashTTL) // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { - glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id) return errEmptyHashSet } for _, hash := range hashPack.hashes { if d.banned.Has(hash) { - glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id) return ErrInvalidChain } } @@ -301,7 +301,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { done, index := false, 0 for index, head = range hashPack.hashes { if d.hasBlock(head) || d.queue.GetBlock(head) != nil { - glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4]) + glog.V(logger.Debug).Infof("Found common hash %x", head[:4]) hashPack.hashes = hashPack.hashes[:index] done = true break @@ -310,7 +310,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // Insert all the new hashes, but only continue if got something useful inserts := d.queue.Insert(hashPack.hashes) if len(inserts) == 0 && !done { - glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) return ErrBadPeer } if !done { @@ -365,7 +365,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } case <-timeout.C: - glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) + glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request", p.id) var p *peer // p will be set if a peer can be found // Attempt to find a new peer by checking inclusion of peers best hash in our @@ -386,10 +386,10 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // by our previous (delayed) peer. active = p p.getHashes(head) - glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id) + glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id) } } - glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start)) + glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v", d.queue.Pending(), time.Since(start)) return nil } @@ -421,22 +421,29 @@ out: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(blockPack.peerId); peer != nil { - // Deliver the received chunk of blocks + // Deliver the received chunk of blocks, and demote in case of errors if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { if err == ErrInvalidChain { // The hash chain is invalid (blocks are not ordered properly), abort return err } // Peer did deliver, but some blocks were off, penalize - glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) + glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err) peer.Demote() peer.SetIdle() break } - if glog.V(logger.Debug) && len(blockPack.blocks) > 0 { - glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) + // If no blocks were delivered, demote the peer (above code is needed to mark the packet done!) + if len(blockPack.blocks) == 0 { + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + peer.Demote() + peer.SetIdle() + break + } + // All was successful, promote the peer + if glog.V(logger.Detail) && len(blockPack.blocks) > 0 { + glog.Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) } - // Promote the peer and update it's idle state peer.Promote() peer.SetIdle() } @@ -481,11 +488,14 @@ out: if request == nil { continue } + if glog.V(logger.Detail) { + glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes)) + } // Fetch the chunk and check for error. If the peer was somehow // already fetching a chunk due to a bug, it will be returned to // the queue if err := peer.Fetch(request); err != nil { - glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id) + glog.V(logger.Error).Infof("Peer %s received double work", peer.id) d.queue.Cancel(request) } } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 8ef017df71..2b3f8798ec 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -5,6 +5,7 @@ package downloader import ( "errors" + "fmt" "math" "sync" "sync/atomic" @@ -135,6 +136,15 @@ func (p *peer) Demote() { } } +// String implements fmt.Stringer. +func (p *peer) String() string { + return fmt.Sprintf("Peer %s [%s]", p.id, + fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ + fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+ + fmt.Sprintf("ignored %4d", p.ignored.Size()), + ) +} + // peerSet represents the collection of active peer participating in the block // download procedure. type peerSet struct { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 69d91512a2..02fa667f19 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -225,7 +225,7 @@ func (q *queue) Reserve(p *peer) *fetchRequest { skip := make(map[common.Hash]int) capacity := p.Capacity() - for len(send) < space && len(send) < capacity && !q.hashQueue.Empty() { + for proc := 0; proc < space && len(send) < capacity && !q.hashQueue.Empty(); proc++ { hash, priority := q.hashQueue.Pop() if p.ignored.Has(hash) { skip[hash.(common.Hash)] = int(priority) From 24cca2f18d4a7abb103179a289db09c2d11bd2a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Jun 2015 15:10:43 +0300 Subject: [PATCH 2/3] eth/downloader: log after state updates, easier to debug --- eth/downloader/downloader.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 806f60f1b8..d8dbef726d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -428,24 +428,22 @@ out: return err } // Peer did deliver, but some blocks were off, penalize - glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err) peer.Demote() peer.SetIdle() + glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err) break } // If no blocks were delivered, demote the peer (above code is needed to mark the packet done!) if len(blockPack.blocks) == 0 { - glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) peer.Demote() peer.SetIdle() + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) break } // All was successful, promote the peer - if glog.V(logger.Detail) && len(blockPack.blocks) > 0 { - glog.Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) - } peer.Promote() peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) } case <-ticker.C: // Check for bad peers. Bad peers may indicate a peer not responding From d754c25cc87172dfafeea71116da2260544a3f09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Jun 2015 16:22:55 +0300 Subject: [PATCH 3/3] eth/downloader: drop log entry from peer, it's covered already --- eth/downloader/peer.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 2b3f8798ec..43b50079b4 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -12,8 +12,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "gopkg.in/fatih/set.v0" ) @@ -101,9 +99,6 @@ func (p *peer) SetIdle() { if next == 1 { p.Demote() } - if prev != next { - glog.V(logger.Detail).Infof("%s: changing block download capacity from %d to %d", p.id, prev, next) - } break } }