eth/downloader: header-chain order and ancestry check
This commit is contained in:
parent
0a7d059b6a
commit
99b62f36b6
@ -1078,7 +1078,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
|
|||||||
// Otherwise insert all the new headers, aborting in case of junk
|
// Otherwise insert all the new headers, aborting in case of junk
|
||||||
glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from)
|
glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from)
|
||||||
|
|
||||||
inserts := d.queue.Insert(headerPack.headers)
|
inserts := d.queue.Insert(headerPack.headers, from)
|
||||||
if len(inserts) != len(headerPack.headers) {
|
if len(inserts) != len(headerPack.headers) {
|
||||||
glog.V(logger.Debug).Infof("%v: stale headers", p)
|
glog.V(logger.Debug).Infof("%v: stale headers", p)
|
||||||
return errBadPeer
|
return errBadPeer
|
||||||
|
@ -139,10 +139,6 @@ func (dl *downloadTester) sync(id string, td *big.Int) error {
|
|||||||
if hashes+blocks == 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 {
|
if hashes+blocks == 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// If there are queued blocks, but the head is missing, it's a stale leftover
|
|
||||||
if hashes+blocks > 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 && dl.downloader.queue.GetHeadBlock() == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// Otherwise sleep a bit and retry
|
// Otherwise sleep a bit and retry
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(time.Millisecond)
|
||||||
}
|
}
|
||||||
@ -660,6 +656,67 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that headers are enqueued continuously, preventing malicious nodes from
|
||||||
|
// stalling the downloader by feeding gapped header chains.
|
||||||
|
func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62) }
|
||||||
|
func TestMissingHeaderAttack63(t *testing.T) { testMissingHeaderAttack(t, 63) }
|
||||||
|
func TestMissingHeaderAttack64(t *testing.T) { testMissingHeaderAttack(t, 64) }
|
||||||
|
|
||||||
|
func testMissingHeaderAttack(t *testing.T, protocol int) {
|
||||||
|
// Create a small enough block chain to download
|
||||||
|
targetBlocks := blockCacheLimit - 15
|
||||||
|
hashes, blocks := makeChain(targetBlocks, 0, genesis)
|
||||||
|
|
||||||
|
tester := newTester()
|
||||||
|
|
||||||
|
// Attempt a full sync with an attacker feeding gapped headers
|
||||||
|
tester.newPeer("attack", protocol, hashes, blocks)
|
||||||
|
missing := targetBlocks / 2
|
||||||
|
delete(tester.peerBlocks["attack"], hashes[missing])
|
||||||
|
|
||||||
|
if err := tester.sync("attack", nil); err == nil {
|
||||||
|
t.Fatalf("succeeded attacker synchronisation")
|
||||||
|
}
|
||||||
|
// Synchronise with the valid peer and make sure sync succeeds
|
||||||
|
tester.newPeer("valid", protocol, hashes, blocks)
|
||||||
|
if err := tester.sync("valid", nil); err != nil {
|
||||||
|
t.Fatalf("failed to synchronise blocks: %v", err)
|
||||||
|
}
|
||||||
|
if imported := len(tester.ownBlocks); imported != len(hashes) {
|
||||||
|
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests that if requested headers are shifted (i.e. first is missing), the queue
|
||||||
|
// detects the invalid numbering.
|
||||||
|
func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62) }
|
||||||
|
func TestShiftedHeaderAttack63(t *testing.T) { testShiftedHeaderAttack(t, 63) }
|
||||||
|
func TestShiftedHeaderAttack64(t *testing.T) { testShiftedHeaderAttack(t, 64) }
|
||||||
|
|
||||||
|
func testShiftedHeaderAttack(t *testing.T, protocol int) {
|
||||||
|
// Create a small enough block chain to download
|
||||||
|
targetBlocks := blockCacheLimit - 15
|
||||||
|
hashes, blocks := makeChain(targetBlocks, 0, genesis)
|
||||||
|
|
||||||
|
tester := newTester()
|
||||||
|
|
||||||
|
// Attempt a full sync with an attacker feeding shifted headers
|
||||||
|
tester.newPeer("attack", protocol, hashes, blocks)
|
||||||
|
delete(tester.peerBlocks["attack"], hashes[len(hashes)-2])
|
||||||
|
|
||||||
|
if err := tester.sync("attack", nil); err == nil {
|
||||||
|
t.Fatalf("succeeded attacker synchronisation")
|
||||||
|
}
|
||||||
|
// Synchronise with the valid peer and make sure sync succeeds
|
||||||
|
tester.newPeer("valid", protocol, hashes, blocks)
|
||||||
|
if err := tester.sync("valid", nil); err != nil {
|
||||||
|
t.Fatalf("failed to synchronise blocks: %v", err)
|
||||||
|
}
|
||||||
|
if imported := len(tester.ownBlocks); imported != len(hashes) {
|
||||||
|
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Tests that if a peer sends an invalid body for a requested block, it gets
|
// Tests that if a peer sends an invalid body for a requested block, it gets
|
||||||
// dropped immediately by the downloader.
|
// dropped immediately by the downloader.
|
||||||
func TestInvalidBlockBodyAttack62(t *testing.T) { testInvalidBlockBodyAttack(t, 62) }
|
func TestInvalidBlockBodyAttack62(t *testing.T) { testInvalidBlockBodyAttack(t, 62) }
|
||||||
|
@ -57,6 +57,7 @@ type queue struct {
|
|||||||
|
|
||||||
headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes
|
headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes
|
||||||
headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for
|
headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for
|
||||||
|
headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
|
||||||
|
|
||||||
pendPool map[string]*fetchRequest // Currently pending block retrieval operations
|
pendPool map[string]*fetchRequest // Currently pending block retrieval operations
|
||||||
|
|
||||||
@ -91,6 +92,7 @@ func (q *queue) Reset() {
|
|||||||
|
|
||||||
q.headerPool = make(map[common.Hash]*types.Header)
|
q.headerPool = make(map[common.Hash]*types.Header)
|
||||||
q.headerQueue.Reset()
|
q.headerQueue.Reset()
|
||||||
|
q.headerHead = common.Hash{}
|
||||||
|
|
||||||
q.pendPool = make(map[string]*fetchRequest)
|
q.pendPool = make(map[string]*fetchRequest)
|
||||||
|
|
||||||
@ -186,7 +188,7 @@ func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash {
|
|||||||
|
|
||||||
// Insert adds a set of headers for the download queue for scheduling, returning
|
// Insert adds a set of headers for the download queue for scheduling, returning
|
||||||
// the new headers encountered.
|
// the new headers encountered.
|
||||||
func (q *queue) Insert(headers []*types.Header) []*types.Header {
|
func (q *queue) Insert(headers []*types.Header, from uint64) []*types.Header {
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
@ -196,13 +198,24 @@ func (q *queue) Insert(headers []*types.Header) []*types.Header {
|
|||||||
// Make sure no duplicate requests are executed
|
// Make sure no duplicate requests are executed
|
||||||
hash := header.Hash()
|
hash := header.Hash()
|
||||||
if _, ok := q.headerPool[hash]; ok {
|
if _, ok := q.headerPool[hash]; ok {
|
||||||
glog.V(logger.Warn).Infof("Header %x already scheduled", hash)
|
glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled", header.Number.Uint64(), hash[:4])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Make sure chain order is honored and preserved throughout
|
||||||
|
if header.Number == nil || header.Number.Uint64() != from {
|
||||||
|
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
|
||||||
|
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4])
|
||||||
|
break
|
||||||
|
}
|
||||||
// Queue the header for body retrieval
|
// Queue the header for body retrieval
|
||||||
inserts = append(inserts, header)
|
inserts = append(inserts, header)
|
||||||
q.headerPool[hash] = header
|
q.headerPool[hash] = header
|
||||||
q.headerQueue.Push(header, -float32(header.Number.Uint64()))
|
q.headerQueue.Push(header, -float32(header.Number.Uint64()))
|
||||||
|
q.headerHead = hash
|
||||||
|
from++
|
||||||
}
|
}
|
||||||
return inserts
|
return inserts
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user