From 2dd6a62f67a217cc2186f04d7d99565f7782c79b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 11 Jun 2015 17:14:45 +0300 Subject: [PATCH] eth/downloader: support individual peers in the test suite --- eth/downloader/downloader_test.go | 235 +++++++++++++++--------------- 1 file changed, 118 insertions(+), 117 deletions(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index e7dfbc70b3..6269ed87cf 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -16,6 +16,8 @@ var ( knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9} bannedHash = common.Hash{5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5} + + genesis = createBlock(1, common.Hash{}, knownHash) ) func createHashes(start, amount int) (hashes []common.Hash) { @@ -51,26 +53,20 @@ func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block { type downloadTester struct { downloader *Downloader - hashes []common.Hash // Chain of hashes simulating - blocks map[common.Hash]*types.Block // Blocks associated with the hashes - chain []common.Hash // Block-chain being constructed + ownHashes []common.Hash // Hash chain belonging to the tester + ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester + peerHashes map[string][]common.Hash // Hash chain belonging to different test peers + peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers maxHashFetch int // Overrides the maximum number of retrieved hashes - - t *testing.T - done chan bool - activePeerId string } -func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester { +func newTester() *downloadTester { tester := &downloadTester{ - t: t, - - hashes: hashes, - blocks: blocks, - chain: []common.Hash{knownHash}, - - done: make(chan bool), + ownHashes: []common.Hash{knownHash}, + ownBlocks: map[common.Hash]*types.Block{knownHash: genesis}, + peerHashes: make(map[string][]common.Hash), + peerBlocks: make(map[string]map[common.Hash]*types.Block), } var mux event.TypeMux downloader := New(&mux, tester.hasBlock, tester.getBlock, nil) @@ -79,13 +75,6 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types return tester } -// sync is a simple wrapper around the downloader to start synchronisation and -// block until it returns -func (dl *downloadTester) sync(peerId string, head common.Hash) error { - dl.activePeerId = peerId - return dl.downloader.synchronise(peerId, head) -} - // syncTake is starts synchronising with a remote peer, but concurrently it also // starts fetching blocks that the downloader retrieved. IT blocks until both go // routines terminate. @@ -102,12 +91,17 @@ func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, e time.Sleep(time.Millisecond) } // Take a batch of blocks and accumulate - took = append(took, dl.downloader.TakeBlocks()...) + blocks := dl.downloader.TakeBlocks() + for _, block := range blocks { + dl.ownHashes = append(dl.ownHashes, block.RawBlock.Hash()) + dl.ownBlocks[block.RawBlock.Hash()] = block.RawBlock + } + took = append(took, blocks...) } done <- struct{}{} }() // Start the downloading, sync the taker and return - err := dl.sync(peerId, head) + err := dl.downloader.synchronise(peerId, head) done <- struct{}{} <-done @@ -115,63 +109,74 @@ func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, e return took, err } +// hasBlock checks if a block is present in the testers canonical chain. func (dl *downloadTester) hasBlock(hash common.Hash) bool { - for _, h := range dl.chain { - if h == hash { - return true - } - } - return false + return dl.getBlock(hash) != nil } +// getBlock retrieves a block from the testers canonical chain. func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { - return dl.blocks[knownHash] + return dl.ownBlocks[hash] } -// getHashes retrieves a batch of hashes for reconstructing the chain. -func (dl *downloadTester) getHashes(head common.Hash) error { - limit := MaxHashFetch - if dl.maxHashFetch > 0 { - limit = dl.maxHashFetch +// newPeer registers a new block download source into the downloader. +func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { + err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id)) + if err == nil { + // Assign the owned hashes and blocks to the peer + dl.peerHashes[id] = hashes + dl.peerBlocks[id] = blocks } - // Gather the next batch of hashes - hashes := make([]common.Hash, 0, limit) - for i, hash := range dl.hashes { - if hash == head { - i++ - for len(hashes) < cap(hashes) && i < len(dl.hashes) { - hashes = append(hashes, dl.hashes[i]) + return err +} + +// peerGetBlocksFn constructs a getHashes function associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of hashes from the particularly requested peer. +func (dl *downloadTester) peerGetHashesFn(id string) func(head common.Hash) error { + return func(head common.Hash) error { + limit := MaxHashFetch + if dl.maxHashFetch > 0 { + limit = dl.maxHashFetch + } + // Gather the next batch of hashes + hashes := dl.peerHashes[id] + result := make([]common.Hash, 0, limit) + for i, hash := range hashes { + if hash == head { i++ - } - break - } - } - // Delay delivery a bit to allow attacks to unfold - id := dl.activePeerId - go func() { - time.Sleep(time.Millisecond) - dl.downloader.DeliverHashes(id, hashes) - }() - return nil -} - -func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error { - return func(hashes []common.Hash) error { - blocks := make([]*types.Block, 0, len(hashes)) - for _, hash := range hashes { - if block, ok := dl.blocks[hash]; ok { - blocks = append(blocks, block) + for len(result) < cap(result) && i < len(hashes) { + result = append(result, hashes[i]) + i++ + } + break } } - go dl.downloader.DeliverBlocks(id, blocks) - + // Delay delivery a bit to allow attacks to unfold + go func() { + time.Sleep(time.Millisecond) + dl.downloader.DeliverHashes(id, result) + }() return nil } } -// newPeer registers a new block download source into the syncer. -func (dl *downloadTester) newPeer(id string, td *big.Int, hash common.Hash) error { - return dl.downloader.RegisterPeer(id, hash, dl.getHashes, dl.getBlocks(id)) +// peerGetBlocksFn constructs a getBlocks function associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of blocks from the particularly requested peer. +func (dl *downloadTester) peerGetBlocksFn(id string) func([]common.Hash) error { + return func(hashes []common.Hash) error { + blocks := dl.peerBlocks[id] + result := make([]*types.Block, 0, len(hashes)) + for _, hash := range hashes { + if block, ok := blocks[hash]; ok { + result = append(result, block) + } + } + go dl.downloader.DeliverBlocks(id, result) + + return nil + } } // Tests that simple synchronization, without throttling from a good peer works. @@ -181,11 +186,11 @@ func TestSynchronisation(t *testing.T) { hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.sync("peer", hashes[0]); err != nil { + if err := tester.downloader.synchronise("peer", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks { @@ -200,11 +205,11 @@ func TestBlockTaking(t *testing.T) { hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) // Synchronise with the peer and test block retrieval - if err := tester.sync("peer", hashes[0]); err != nil { + if err := tester.downloader.synchronise("peer", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks { @@ -214,7 +219,7 @@ func TestBlockTaking(t *testing.T) { // Tests that an inactive downloader will not accept incoming hashes and blocks. func TestInactiveDownloader(t *testing.T) { - tester := newTester(t, nil, nil) + tester := newTester() // Check that neither hashes nor blocks are accepted if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { @@ -232,11 +237,11 @@ func TestCancel(t *testing.T) { hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) // Synchronise with the peer, but cancel afterwards - if err := tester.sync("peer", hashes[0]); err != nil { + if err := tester.downloader.synchronise("peer", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } if !tester.downloader.Cancel() { @@ -260,13 +265,13 @@ func TestThrottling(t *testing.T) { hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) // Start a synchronisation concurrently errc := make(chan error) go func() { - errc <- tester.sync("peer", hashes[0]) + errc <- tester.downloader.synchronise("peer", hashes[0]) }() // Iteratively take some blocks, always checking the retrieval count for total := 0; total < targetBlocks; { @@ -303,9 +308,9 @@ func TestNonExistingParentAttack(t *testing.T) { forged.ParentHeaderHash = unknownHash // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, blocks) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if err := tester.sync("attack", hashes[0]); err != nil { + tester := newTester() + tester.newPeer("attack", hashes, blocks) + if err := tester.downloader.synchronise("attack", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } bs := tester.downloader.TakeBlocks() @@ -319,8 +324,8 @@ func TestNonExistingParentAttack(t *testing.T) { // Reconstruct a valid chain, and try to synchronize with it forged.ParentHeaderHash = knownHash - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if err := tester.sync("valid", hashes[0]); err != nil { + tester.newPeer("valid", hashes, blocks) + if err := tester.downloader.synchronise("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } bs = tester.downloader.TakeBlocks() @@ -341,12 +346,12 @@ func TestRepeatingHashAttack(t *testing.T) { forged := hashes[:len(hashes)-1] // Try and sync with the malicious node - tester := newTester(t, forged, blocks) - tester.newPeer("attack", big.NewInt(10000), forged[0]) + tester := newTester() + tester.newPeer("attack", forged, blocks) errc := make(chan error) go func() { - errc <- tester.sync("attack", hashes[0]) + errc <- tester.downloader.synchronise("attack", hashes[0]) }() // Make sure that syncing returns and does so with a failure @@ -359,9 +364,8 @@ func TestRepeatingHashAttack(t *testing.T) { } } // Ensure that a valid chain can still pass sync - tester.hashes = hashes - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if err := tester.sync("valid", hashes[0]); err != nil { + tester.newPeer("valid", hashes, blocks) + if err := tester.downloader.synchronise("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -377,15 +381,15 @@ func TestNonExistingBlockAttack(t *testing.T) { hashes[len(hashes)/2] = unknownHash // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, blocks) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable { + tester := newTester() + tester.newPeer("attack", hashes, blocks) + if err := tester.downloader.synchronise("attack", hashes[0]); err != errPeersUnavailable { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable) } // Ensure that a valid chain can still pass sync hashes[len(hashes)/2] = origin - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if err := tester.sync("valid", hashes[0]); err != nil { + tester.newPeer("valid", hashes, blocks) + if err := tester.downloader.synchronise("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -408,14 +412,13 @@ func TestInvalidHashOrderAttack(t *testing.T) { copy(reverse[blockCacheLimit:], chunk2) // Try and sync with the malicious node and check that it fails - tester := newTester(t, reverse, blocks) - tester.newPeer("attack", big.NewInt(10000), reverse[0]) + tester := newTester() + tester.newPeer("attack", reverse, blocks) if _, err := tester.syncTake("attack", reverse[0]); err != errInvalidChain { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Ensure that a valid chain can still pass sync - tester.hashes = hashes - tester.newPeer("valid", big.NewInt(20000), hashes[0]) + tester.newPeer("valid", hashes, blocks) if _, err := tester.syncTake("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -431,8 +434,8 @@ func TestMadeupHashChainAttack(t *testing.T) { hashes := createHashes(0, 1024*blockCacheLimit) // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, nil) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("attack", hashes, nil) if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } @@ -445,11 +448,11 @@ func TestMadeupHashChainAttack(t *testing.T) { func TestMadeupHashChainDrippingAttack(t *testing.T) { // Create a random chain of hashes to drip hashes := createHashes(0, 16*blockCacheLimit) - tester := newTester(t, hashes, nil) + tester := newTester() // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester.newPeer("attack", hashes, nil) if _, err := tester.syncTake("attack", hashes[0]); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } @@ -473,8 +476,8 @@ func TestMadeupBlockChainAttack(t *testing.T) { gapped[i] = hashes[2*i] } // Try and sync with the malicious node and check that it fails - tester := newTester(t, gapped, blocks) - tester.newPeer("attack", big.NewInt(10000), gapped[0]) + tester := newTester() + tester.newPeer("attack", gapped, blocks) if _, err := tester.syncTake("attack", gapped[0]); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } @@ -482,8 +485,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.hashes = hashes - tester.newPeer("valid", big.NewInt(20000), hashes[0]) + tester.newPeer("valid", hashes, blocks) if _, err := tester.syncTake("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -507,8 +509,8 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { block.ParentHeaderHash = hash // Simulate pointing to already known hash } // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, forges) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("attack", hashes, forges) if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } @@ -516,8 +518,7 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.blocks = blocks - tester.newPeer("valid", big.NewInt(20000), hashes[0]) + tester.newPeer("valid", hashes, blocks) if _, err := tester.syncTake("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -534,12 +535,12 @@ func TestBannedChainStarvationAttack(t *testing.T) { blocks := createBlocksFromHashes(hashes) // Create the tester and ban the selected hash - tester := newTester(t, hashes, blocks) + tester := newTester() tester.downloader.banned.Add(bannedHash) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester.newPeer("attack", hashes, blocks) for banned := tester.downloader.banned.Size(); ; { // Try to sync with the attacker, check hash chain failure if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain { @@ -556,7 +557,7 @@ func TestBannedChainStarvationAttack(t *testing.T) { banned = bans } // Check that after banning an entire chain, bad peers get dropped - if err := tester.newPeer("new attacker", big.NewInt(10000), hashes[0]); err != errBannedHead { + if err := tester.newPeer("new attacker", hashes, blocks); err != errBannedHead { t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) } if peer := tester.downloader.peers.Peer("net attacker"); peer != nil { @@ -579,12 +580,12 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { blocks := createBlocksFromHashes(hashes) // Create the tester and ban the selected hash - tester := newTester(t, hashes, blocks) + tester := newTester() tester.downloader.banned.Add(bannedHash) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester.newPeer("attack", hashes, blocks) for { // Try to sync with the attacker, check hash chain failure if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain {