major rewrite, reorg of blockpool + new features

- blockpool moves to its own package
- uses errs pkg for its own coded errors
- publicly settable config of params (time intervals and batchsizes)
- test helpers in subpackage
- optional TD in blocks used now to update peers chain info
- major improvement in algorithm
- fix fragility and sync/parallelisation bugs
- implement status for reporting on sync status (peers/hashes/blocks etc)
- several tests added and further corner cases covered
This commit is contained in:
zelig 2015-02-25 19:34:12 +07:00
parent d46c7bcaf9
commit 422490d75c
13 changed files with 3583 additions and 0 deletions

749
blockpool/blockpool.go Normal file

@ -0,0 +1,749 @@
package blockpool
import (
"bytes"
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
ethlogger "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/pow"
)
var plog = ethlogger.NewLogger("Blockpool")
var (
// max number of block hashes sent in one request
blockHashesBatchSize = 512
// max number of blocks sent in one request
blockBatchSize = 64
// interval between two consecutive block checks (and requests)
blocksRequestInterval = 3 * time.Second
// level of redundancy in block requests sent
blocksRequestRepetition = 1
// interval between two consecutive block hash checks (and requests)
blockHashesRequestInterval = 3 * time.Second
// max number of idle iterations, ie., check through a section without new blocks coming in
blocksRequestMaxIdleRounds = 100
// timeout interval: max time allowed for peer without sending a block hash
blockHashesTimeout = 60 * time.Second
// timeout interval: max time allowed for peer without sending a block
blocksTimeout = 120 * time.Second
)
// config embedded in components, by default fall back to constants
// by default all resolved to local
type Config struct {
BlockHashesBatchSize int
BlockBatchSize int
BlocksRequestRepetition int
BlocksRequestMaxIdleRounds int
BlockHashesRequestInterval time.Duration
BlocksRequestInterval time.Duration
BlockHashesTimeout time.Duration
BlocksTimeout time.Duration
}
// blockpool errors
const (
ErrInvalidBlock = iota
ErrInvalidPoW
ErrUnrequestedBlock
ErrInsufficientChainInfo
)
var errorToString = map[int]string{
ErrInvalidBlock: "Invalid block",
ErrInvalidPoW: "Invalid PoW",
ErrUnrequestedBlock: "Unrequested block",
ErrInsufficientChainInfo: "Insufficient chain info",
}
// init initialises all your laundry
func (self *Config) init() {
if self.BlockHashesBatchSize == 0 {
self.BlockHashesBatchSize = blockHashesBatchSize
}
if self.BlockBatchSize == 0 {
self.BlockBatchSize = blockBatchSize
}
if self.BlocksRequestRepetition == 0 {
self.BlocksRequestRepetition = blocksRequestRepetition
}
if self.BlocksRequestMaxIdleRounds == 0 {
self.BlocksRequestMaxIdleRounds = blocksRequestMaxIdleRounds
}
if self.BlockHashesRequestInterval == 0 {
self.BlockHashesRequestInterval = blockHashesRequestInterval
}
if self.BlocksRequestInterval == 0 {
self.BlocksRequestInterval = blocksRequestInterval
}
if self.BlockHashesTimeout == 0 {
self.BlockHashesTimeout = blockHashesTimeout
}
if self.BlocksTimeout == 0 {
self.BlocksTimeout = blocksTimeout
}
}
// node is the basic unit of the internal model of block chain/tree in the blockpool
type node struct {
lock sync.RWMutex
hash []byte
block *types.Block
hashBy string
blockBy string
td *big.Int
}
type index struct {
int
}
// entry is the struct kept and indexed in the pool
type entry struct {
node *node
section *section
index *index
}
type BlockPool struct {
Config *Config
// the minimal interface with blockchain
hasBlock func(hash []byte) bool
insertChain func(types.Blocks) error
verifyPoW func(pow.Block) bool
pool map[string]*entry
peers *peers
lock sync.RWMutex
chainLock sync.RWMutex
// alloc-easy pool of hash slices
hashSlicePool chan [][]byte
status *status
quit chan bool
wg sync.WaitGroup
running bool
}
// public constructor
func New(
hasBlock func(hash []byte) bool,
insertChain func(types.Blocks) error,
verifyPoW func(pow.Block) bool,
) *BlockPool {
return &BlockPool{
Config: &Config{},
hasBlock: hasBlock,
insertChain: insertChain,
verifyPoW: verifyPoW,
}
}
// allows restart
func (self *BlockPool) Start() {
self.lock.Lock()
defer self.lock.Unlock()
if self.running {
return
}
self.Config.init()
self.hashSlicePool = make(chan [][]byte, 150)
self.status = newStatus()
self.quit = make(chan bool)
self.pool = make(map[string]*entry)
self.running = true
self.peers = &peers{
errors: &errs.Errors{
Package: "Blockpool",
Errors: errorToString,
},
peers: make(map[string]*peer),
status: self.status,
bp: self,
}
timer := time.NewTicker(3 * time.Second)
go func() {
for {
select {
case <-self.quit:
return
case <-timer.C:
plog.Debugf("status:\n%v", self.Status())
}
}
}()
plog.Infoln("Started")
}
func (self *BlockPool) Stop() {
self.lock.Lock()
if !self.running {
self.lock.Unlock()
return
}
self.running = false
self.lock.Unlock()
plog.Infoln("Stopping...")
close(self.quit)
self.lock.Lock()
self.peers = nil
self.pool = nil
self.lock.Unlock()
plog.Infoln("Stopped")
}
// Wait blocks until active processes finish
func (self *BlockPool) Wait(t time.Duration) {
self.lock.Lock()
if !self.running {
self.lock.Unlock()
return
}
self.lock.Unlock()
plog.Infoln("Waiting for processes to complete...")
w := make(chan bool)
go func() {
self.wg.Wait()
close(w)
}()
select {
case <-w:
plog.Infoln("Processes complete")
case <-time.After(t):
plog.Warnf("Timeout")
}
}
/*
AddPeer is called by the eth protocol instance running on the peer after
the status message has been received with total difficulty and current block hash
Called a second time with the same peer id, it is used to update chain info for a peer. This is used when a new (mined) block message is received.
RemovePeer needs to be called when the peer disconnects.
Peer info is currently not persisted across disconnects (or sessions)
*/
func (self *BlockPool) AddPeer(
td *big.Int, currentBlockHash []byte,
peerId string,
requestBlockHashes func([]byte) error,
requestBlocks func([][]byte) error,
peerError func(*errs.Error),
) (best bool) {
return self.peers.addPeer(td, currentBlockHash, peerId, requestBlockHashes, requestBlocks, peerError)
}
// RemovePeer needs to be called when the peer disconnects
func (self *BlockPool) RemovePeer(peerId string) {
self.peers.removePeer(peerId)
}
/*
AddBlockHashes
Entry point for eth protocol to add block hashes received via BlockHashesMsg
only hashes from the best peer are handled
initiates further hash requests until a known parent is reached (unless cancelled by a peerSwitch event, i.e., when a better peer becomes best peer)
launches all block request processes on each chain section
the first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns.
*/
func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string) {
bestpeer, best := self.peers.getPeer(peerId)
if !best {
return
}
// bestpeer is still the best peer
self.wg.Add(1)
defer func() { self.wg.Done() }()
self.status.lock.Lock()
self.status.activePeers[bestpeer.id]++
self.status.lock.Unlock()
var n int
var hash []byte
var ok, headSection, peerswitch bool
var sec, child, parent *section
var entry *entry
var nodes []*node
hash, ok = next()
bestpeer.lock.Lock()
plog.Debugf("AddBlockHashes: peer <%s> starting from [%s] (peer head: %s)", peerId, hex(bestpeer.parentHash), hex(bestpeer.currentBlockHash))
// first check if we are building the head section of a peer's chain
if bytes.Equal(bestpeer.parentHash, hash) {
if self.hasBlock(bestpeer.currentBlockHash) {
return
}
/*
when peer is promoted in switchPeer, a new header section process is launched
as the head section skeleton is actually created here, it is signaled to the process
so that it can quit
in the special case that the node for parent of the head block is found in the blockpool
(with or without fetched block)
*/
headSection = true
if entry := self.get(bestpeer.currentBlockHash); entry == nil {
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(bestpeer.parentHash))
// if head block is not yet in the pool, create entry and start node list for section
node := &node{
hash: bestpeer.currentBlockHash,
block: bestpeer.currentBlock,
hashBy: peerId,
blockBy: peerId,
}
// nodes is a list of nodes in one section ordered top-bottom (old to young)
nodes = append(nodes, node)
n++
} else {
// otherwise set child section iff found node is the root of a section
// this is a possible scenario when a singleton head section was created
// on an earlier occasion this peer or another with the same block was best peer
if entry.node == entry.section.bottom {
child = entry.section
plog.DebugDetailf("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash))
}
}
} else {
// otherwise : we are not building the head section of the peer
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(hash))
}
// the switch channel signals peerswitch event
switchC := bestpeer.switchC
bestpeer.lock.Unlock()
// iterate over hashes coming from peer (first round we have hash set above)
LOOP:
for ; ok; hash, ok = next() {
select {
case <-self.quit:
// global quit for blockpool
return
case <-switchC:
// if the peer is demoted, no more hashes read
plog.DebugDetailf("AddBlockHashes: demoted peer <%s> (head: %s)", peerId, hex(bestpeer.currentBlockHash), hex(hash))
peerswitch = true
break LOOP
default:
}
// if we reach the blockchain we stop reading more
if self.hasBlock(hash) {
// check if known block connecting the downloaded chain to our blockchain
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash))
if len(nodes) == 1 {
// create new section if needed and push it to the blockchain
sec = self.newSection(nodes)
sec.addSectionToBlockChain(bestpeer)
} else {
/*
not added hash yet but according to peer child section built
earlier chain connects with blockchain
this maybe a potential vulnarability
the root block arrives (or already there but its parenthash was not pointing to known block in the blockchain)
we start inserting -> error -> remove the entire chain
instead of punishing this peer
solution: when switching peers always make sure best peers own head block
and td together with blockBy are recorded on the node
*/
if len(nodes) == 0 && child != nil {
child.addSectionToBlockChain(bestpeer)
}
}
break LOOP
}
// look up node in the pool
entry = self.get(hash)
if entry != nil {
// reached a known chain in the pool
if entry.node == entry.section.bottom && n == 1 {
/*
the first block hash received is an orphan in the pool
this also supports clients that (despite the spec) include <from> hash in their
response to hashes request. Note that by providing <from> we can link sections
without having to wait for the root block of the child section to arrive, so it allows for superior performance
*/
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section))
// record the entry's chain section as child section
child = entry.section
continue LOOP
}
// otherwise record entry's chain section as parent connecting it to the pool
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block [%s] in section [%s]. Connected to pool.", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section))
parent = entry.section
break LOOP
}
// finally if node for block hash does not exist, create it and append node to section nodes
node := &node{
hash: hash,
hashBy: peerId,
}
nodes = append(nodes, node)
} //for
/*
we got here if
- run out of hashes (parent = nil) sent by our best peer
- our peer is demoted (peerswitch = true)
- reached blockchain or blockpool
- quitting
*/
self.chainLock.Lock()
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes))
/*
handle forks where connecting node is mid-section
by splitting section at fork
no splitting needed if connecting node is head of a section
*/
if parent != nil && entry != nil && entry.node != parent.top && len(nodes) > 0 {
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash))
self.splitSection(parent, entry)
self.status.lock.Lock()
self.status.values.Forks++
self.status.lock.Unlock()
}
/*
if new section is created, link it to parent/child sections
and launch section process fetching blocks and further hashes
*/
sec = self.linkSections(nodes, parent, child)
self.status.lock.Lock()
self.status.values.BlockHashes += len(nodes)
self.status.lock.Unlock()
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): section [%s] created", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec))
self.chainLock.Unlock()
/*
if a blockpool node is reached (parent section is not nil),
activate section (unless our peer is demoted by now).
this can be the bottom half of a newly split section in case of a fork.
bestPeer is nil if we got here after our peer got demoted while processing.
in this case no activation should happen
*/
if parent != nil && !peerswitch {
self.activateChain(parent, bestpeer, nil)
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): parent section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(parent))
}
/*
if a new section was created,
register section iff head section or no child known
activate it with this peer
*/
if sec != nil {
// switch on section process (it is paused by switchC)
if !peerswitch {
if headSection || child == nil {
bestpeer.lock.Lock()
bestpeer.sections = append(bestpeer.sections, sec.top.hash)
bestpeer.lock.Unlock()
}
/*
request next block hashes for parent section here.
but only once, repeating only when bottom block arrives,
otherwise no way to check if it arrived
*/
bestpeer.requestBlockHashes(sec.bottom.hash)
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec))
sec.activate(bestpeer)
} else {
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) no longer best: delay requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec))
sec.deactivate()
}
}
// if we are processing peer's head section, signal it to headSection process that it is created
if headSection {
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash))
var headSec *section
switch {
case sec != nil:
headSec = sec
case child != nil:
headSec = child
default:
headSec = parent
}
if !peerswitch {
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section [%s] created signalled to head section process", peerId, hex(bestpeer.currentBlockHash), sectionhex(headSec))
bestpeer.headSectionC <- headSec
}
}
}
/*
AddBlock is the entry point for the eth protocol to call when blockMsg is received.
It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error
At the same time it is opportunistic in that if a requested block may be provided by any peer.
The received block is checked for PoW. Only the first PoW-valid block for a hash is considered legit.
*/
func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
hash := block.Hash()
sender, _ := self.peers.getPeer(peerId)
if sender == nil {
return
}
self.status.lock.Lock()
self.status.activePeers[peerId]++
self.status.lock.Unlock()
entry := self.get(hash)
// a peer's current head block is appearing the first time
if bytes.Equal(hash, sender.currentBlockHash) {
if sender.currentBlock == nil {
plog.Debugf("AddBlock: add head block %s for peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.setChainInfoFromBlock(block)
// sender.currentBlockC <- block
self.status.lock.Lock()
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
} else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash))
}
} else {
plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.lock.Lock()
// update peer chain info if more recent than what we registered
if block.Td != nil && block.Td.Cmp(sender.td) > 0 {
sender.td = block.Td
sender.currentBlockHash = block.Hash()
sender.parentHash = block.ParentHash()
sender.currentBlock = block
sender.headSection = nil
}
sender.lock.Unlock()
if entry == nil {
// penalise peer for sending what we have not asked
plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrUnrequestedBlock, "%x", hash)
self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
return
}
}
if entry == nil {
return
}
node := entry.node
node.lock.Lock()
defer node.lock.Unlock()
// check if block already present
if node.block != nil {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy)
return
}
// check if block is already inserted in the blockchain
if self.hasBlock(hash) {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already in the blockchain", hex(hash), peerId, hex(sender.currentBlockHash))
return
}
// validate block for PoW
if !self.verifyPoW(block) {
plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrInvalidPoW, "%x", hash)
self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
return
}
node.block = block
node.blockBy = peerId
node.td = block.Td // optional field
self.status.lock.Lock()
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
}
/*
iterates down a chain section by section
activating section process on incomplete sections with peer
relinking orphaned sections with their parent if root block (and its parent hash) is known)
*/
func (self *BlockPool) activateChain(sec *section, p *peer, connected map[string]*section) {
p.lock.RLock()
switchC := p.switchC
p.lock.RUnlock()
var i int
LOOP:
for sec != nil {
parent := self.getParent(sec)
plog.DebugDetailf("activateChain: section [%s] activated by peer <%s>", sectionhex(sec), p.id)
sec.activate(p)
if i > 0 && connected != nil {
connected[string(sec.top.hash)] = sec
}
/*
we need to relink both complete and incomplete sections
the latter could have been blockHashesRequestsComplete before being delinked from its parent
*/
if parent == nil && sec.bottom.block != nil {
if entry := self.get(sec.bottom.block.ParentHash()); entry != nil {
parent = entry.section
plog.DebugDetailf("activateChain: [%s]-[%s] relink", sectionhex(parent), sectionhex(sec))
link(parent, sec)
}
}
sec = parent
// stop if peer got demoted
select {
case <-switchC:
break LOOP
case <-self.quit:
break LOOP
default:
}
}
}
// must run in separate go routine, otherwise
// switchpeer -> activateChain -> activate deadlocks on section process select and peers.lock
func (self *BlockPool) requestBlocks(attempts int, hashes [][]byte) {
self.wg.Add(1)
go func() {
self.peers.requestBlocks(attempts, hashes)
self.wg.Done()
}()
}
// convenience methods to access adjacent sections
func (self *BlockPool) getParent(sec *section) *section {
self.chainLock.RLock()
defer self.chainLock.RUnlock()
return sec.parent
}
func (self *BlockPool) getChild(sec *section) *section {
self.chainLock.RLock()
defer self.chainLock.RUnlock()
return sec.child
}
// accessor and setter for entries in the pool
func (self *BlockPool) get(hash []byte) *entry {
self.lock.RLock()
defer self.lock.RUnlock()
return self.pool[string(hash)]
}
func (self *BlockPool) set(hash []byte, e *entry) {
self.lock.Lock()
defer self.lock.Unlock()
self.pool[string(hash)] = e
}
func (self *BlockPool) remove(sec *section) {
// delete node entries from pool index under pool lock
self.lock.Lock()
defer self.lock.Unlock()
for _, node := range sec.nodes {
delete(self.pool, string(node.hash))
}
}
func (self *BlockPool) getHashSlice() (s [][]byte) {
select {
case s = <-self.hashSlicePool:
default:
s = make([][]byte, self.Config.BlockBatchSize)
}
return
}
// Return returns a Client to the pool.
func (self *BlockPool) putHashSlice(s [][]byte) {
if len(s) == self.Config.BlockBatchSize {
select {
case self.hashSlicePool <- s:
default:
}
}
}
// pretty prints hash (byte array) with first 4 bytes in hex
func hex(hash []byte) (name string) {
if hash == nil {
name = ""
} else {
name = fmt.Sprintf("%x", hash[:4])
}
return
}
// pretty prints a section using first 4 bytes in hex of bottom and top blockhash of the section
func sectionhex(section *section) (name string) {
if section == nil {
name = ""
} else {
name = fmt.Sprintf("%x-%x", section.bottom.hash[:4], section.top.hash[:4])
}
return
}

479
blockpool/blockpool_test.go Normal file

@ -0,0 +1,479 @@
package blockpool
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethutil"
)
func TestPeerWithKnownBlock(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.refBlockChain[0] = nil
blockPoolTester.blockChain[0] = nil
blockPool.Start()
peer0 := blockPoolTester.newPeer("0", 1, 0)
peer0.AddPeer()
blockPool.Wait(waitTimeout)
blockPool.Stop()
// no request on known block
peer0.checkBlockHashesRequests()
}
func TestPeerWithKnownParentBlock(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.initRefBlockChain(1)
blockPoolTester.blockChain[0] = nil
blockPool.Start()
peer0 := blockPoolTester.newPeer("0", 1, 1)
peer0.AddPeer()
peer0.serveBlocks(0, 1)
blockPool.Wait(waitTimeout)
blockPool.Stop()
peer0.checkBlocksRequests([]int{1})
peer0.checkBlockHashesRequests()
blockPoolTester.refBlockChain[1] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerPromotionByOptionalTdOnBlock(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(4)
peer0 := blockPoolTester.newPeer("peer0", 2, 2)
peer1 := blockPoolTester.newPeer("peer1", 1, 1)
peer2 := blockPoolTester.newPeer("peer2", 3, 4)
blockPool.Start()
// pool
peer0.AddPeer()
peer0.serveBlocks(1, 2)
best := peer1.AddPeer()
// this tests that peer1 is not promoted over peer0 yet
if best {
t.Errorf("peer1 (TD=1) should not be set as best")
}
best = peer2.AddPeer()
peer2.serveBlocks(3, 4)
peer2.serveBlockHashes(4, 3, 2, 1)
hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3})
peer1.waitBlocksRequests(3)
blockPool.AddBlock(&types.Block{
HeaderHash: ethutil.Bytes(hashes[1]),
ParentHeaderHash: ethutil.Bytes(hashes[0]),
Td: ethutil.Big3,
}, "peer1")
blockPool.RemovePeer("peer2")
if blockPool.peers.best.id != "peer1" {
t.Errorf("peer1 (TD=3) should be set as best")
}
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[4] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestSimpleChain(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(2)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 2)
peer1.AddPeer()
peer1.serveBlocks(1, 2)
go peer1.serveBlockHashes(2, 1, 0)
peer1.serveBlocks(0, 1)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[2] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestChainConnectingWithParentHash(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(3)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer1.AddPeer()
go peer1.serveBlocks(2, 3)
go peer1.serveBlockHashes(3, 2, 1)
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[3] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestMultiSectionChain(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(5)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 5)
peer1.AddPeer()
go peer1.serveBlocks(4, 5)
go peer1.serveBlockHashes(5, 4, 3)
go peer1.serveBlocks(2, 3, 4)
go peer1.serveBlockHashes(3, 2, 1, 0)
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[5] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestNewBlocksOnPartialChain(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(7)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 5)
peer1.AddPeer()
go peer1.serveBlocks(4, 5) // partially complete section
go peer1.serveBlockHashes(5, 4, 3)
peer1.serveBlocks(3, 4) // partially complete section
// peer1 found new blocks
peer1.td = 2
peer1.currentBlock = 7
peer1.AddPeer()
peer1.sendBlocks(6, 7)
go peer1.serveBlockHashes(7, 6, 5)
go peer1.serveBlocks(2, 3)
go peer1.serveBlocks(5, 6)
go peer1.serveBlockHashes(3, 2, 1) // tests that hash request from known chain root is remembered
peer1.serveBlocks(0, 1, 2)
// blockPool.RemovePeer("peer1")
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[7] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerSwitchUp(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(7)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 6)
peer2 := blockPoolTester.newPeer("peer2", 2, 7)
peer1.AddPeer()
go peer1.serveBlocks(5, 6)
go peer1.serveBlockHashes(6, 5, 4, 3) //
peer1.serveBlocks(2, 3) // section partially complete, block 3 will be preserved after peer demoted
peer2.AddPeer() // peer2 is promoted as best peer, peer1 is demoted
go peer2.serveBlocks(6, 7)
// go peer2.serveBlockHashes(7, 6) //
go peer2.serveBlocks(4, 5) // tests that block request for earlier section is remembered
go peer1.serveBlocks(3, 4) // tests that connecting section by demoted peer is remembered and blocks are accepted from demoted peer
go peer2.serveBlockHashes(3, 2, 1, 0) // tests that known chain section is activated, hash requests from 3 is remembered
peer2.serveBlocks(0, 1, 2) // final blocks linking to blockchain sent
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[7] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerSwitchDownOverlapSectionWithoutRootBlock(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(6)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 4)
peer2 := blockPoolTester.newPeer("peer2", 2, 6)
peer2.AddPeer()
peer2.serveBlocks(5, 6) // partially complete, section will be preserved
peer2.serveBlockHashes(6, 5, 4) // no go: make sure skeleton is created
peer1.AddPeer() // inferior peer1 is promoted as best peer
blockPool.RemovePeer("peer2") // peer2 disconnects
go peer1.serveBlockHashes(4, 3, 2, 1, 0) //
go peer1.serveBlocks(3, 4) //
go peer1.serveBlocks(4, 5) // tests that section set by demoted peer is remembered and blocks are accepted from new peer if they have it even if peers original TD is lower
peer1.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[6] = []int{} // tests that idle sections are not inserted in blockchain
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerSwitchDownOverlapSectionWithRootBlock(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(6)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 4)
peer2 := blockPoolTester.newPeer("peer2", 2, 6)
peer2.AddPeer()
peer2.serveBlocks(5, 6) // partially complete, section will be preserved
go peer2.serveBlockHashes(6, 5, 4) //
peer2.serveBlocks(3, 4) // !incomplete section
time.Sleep(100 * time.Millisecond) // make sure block 4 added
peer1.AddPeer() // inferior peer1 is promoted as best peer
blockPool.RemovePeer("peer2") // peer2 disconnects
go peer1.serveBlockHashes(4, 3, 2, 1, 0) // tests that hash request are directly connecting if the head block exists
go peer1.serveBlocks(4, 5) // tests that section set by demoted peer is remembered and blocks are accepted from new peer if they have it even if peers original TD is lower
peer1.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[6] = []int{} // tests that idle sections are not inserted in blockchain
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerSwitchDownDisjointSection(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(3)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer2 := blockPoolTester.newPeer("peer2", 2, 6)
peer2.AddPeer()
peer2.serveBlocks(5, 6) // partially complete, section will be preserved
go peer2.serveBlockHashes(6, 5, 4) //
peer2.serveBlocks(3, 4, 5) //
time.Sleep(100 * time.Millisecond) // make sure blocks are received
peer1.AddPeer() // inferior peer1 is promoted as best peer
blockPool.RemovePeer("peer2") // peer2 disconnects
go peer1.serveBlocks(2, 3) //
go peer1.serveBlockHashes(3, 2, 1) //
peer1.serveBlocks(0, 1, 2) //
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[3] = []int{} // tests that idle sections are not inserted in blockchain
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerSwitchBack(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(8)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 2, 11)
peer2 := blockPoolTester.newPeer("peer2", 1, 8)
peer2.AddPeer()
go peer2.serveBlocks(7, 8)
go peer2.serveBlockHashes(8, 7, 6)
go peer2.serveBlockHashes(6, 5, 4)
peer2.serveBlocks(4, 5) // section partially complete
peer1.AddPeer() // peer1 is promoted as best peer
go peer1.serveBlocks(10, 11) //
peer1.serveBlockHashes(11, 10) // only gives useless results
blockPool.RemovePeer("peer1") // peer1 disconnects
go peer2.serveBlockHashes(4, 3, 2, 1, 0) // tests that asking for hashes from 4 is remembered
go peer2.serveBlocks(3, 4, 5, 6, 7, 8) // tests that section 4, 5, 6 and 7, 8 are remembered for missing blocks
peer2.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[8] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestForkSimple(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(9)
blockPoolTester.refBlockChain[3] = []int{4, 7}
delete(blockPoolTester.refBlockChain, 6)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 9)
peer2 := blockPoolTester.newPeer("peer2", 2, 6)
peer1.AddPeer()
go peer1.serveBlocks(8, 9)
go peer1.serveBlockHashes(9, 8, 7, 3, 2)
peer1.serveBlocks(1, 2, 3, 7, 8)
peer2.AddPeer() // peer2 is promoted as best peer
go peer2.serveBlocks(5, 6) //
go peer2.serveBlockHashes(6, 5, 4, 3, 2) // fork on 3 -> 4 (earlier child: 7)
go peer2.serveBlocks(1, 2, 3, 4, 5)
go peer2.serveBlockHashes(2, 1, 0)
peer2.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[6] = []int{}
blockPoolTester.refBlockChain[3] = []int{4}
delete(blockPoolTester.refBlockChain, 7)
delete(blockPoolTester.refBlockChain, 8)
delete(blockPoolTester.refBlockChain, 9)
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestForkSwitchBackByNewBlocks(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(11)
blockPoolTester.refBlockChain[3] = []int{4, 7}
delete(blockPoolTester.refBlockChain, 6)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 9)
peer2 := blockPoolTester.newPeer("peer2", 2, 6)
peer1.AddPeer()
go peer1.serveBlocks(8, 9) //
go peer1.serveBlockHashes(9, 8, 7, 3, 2) //
peer1.serveBlocks(7, 8) // partial section
// time.Sleep(1 * time.Second)
peer2.AddPeer() //
go peer2.serveBlocks(5, 6) //
go peer2.serveBlockHashes(6, 5, 4, 3, 2) // peer2 forks on block 3
peer2.serveBlocks(1, 2, 3, 4, 5) //
// peer1 finds new blocks
peer1.td = 3
peer1.currentBlock = 11
peer1.AddPeer()
go peer1.serveBlocks(10, 11)
go peer1.serveBlockHashes(11, 10, 9)
go peer1.serveBlocks(9, 10)
// time.Sleep(1 * time.Second)
go peer1.serveBlocks(3, 7) // tests that block requests on earlier fork are remembered
go peer1.serveBlockHashes(2, 1) // tests that hash request from root of connecting chain section (added by demoted peer) is remembered
peer1.serveBlocks(0, 1)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[11] = []int{}
blockPoolTester.refBlockChain[3] = []int{7}
delete(blockPoolTester.refBlockChain, 6)
delete(blockPoolTester.refBlockChain, 5)
delete(blockPoolTester.refBlockChain, 4)
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestForkSwitchBackByPeerSwitchBack(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(9)
blockPoolTester.refBlockChain[3] = []int{4, 7}
delete(blockPoolTester.refBlockChain, 6)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 9)
peer2 := blockPoolTester.newPeer("peer2", 2, 6)
peer1.AddPeer()
go peer1.serveBlocks(8, 9)
go peer1.serveBlockHashes(9, 8, 7, 3, 2)
peer1.serveBlocks(7, 8)
peer2.AddPeer()
go peer2.serveBlocks(5, 6) //
go peer2.serveBlockHashes(6, 5, 4, 3, 2) // peer2 forks on block 3
peer2.serveBlocks(2, 3, 4, 5) //
blockPool.RemovePeer("peer2") // peer2 disconnects, peer1 is promoted again as best peer
go peer1.serveBlocks(1, 2) //
go peer1.serveBlockHashes(2, 1, 0) //
go peer1.serveBlocks(3, 7) // tests that block requests on earlier fork are remembered and orphan section relinks to existing parent block
peer1.serveBlocks(0, 1)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[9] = []int{}
blockPoolTester.refBlockChain[3] = []int{7}
delete(blockPoolTester.refBlockChain, 6)
delete(blockPoolTester.refBlockChain, 5)
delete(blockPoolTester.refBlockChain, 4)
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestForkCompleteSectionSwitchBackByPeerSwitchBack(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(9)
blockPoolTester.refBlockChain[3] = []int{4, 7}
delete(blockPoolTester.refBlockChain, 6)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 9)
peer2 := blockPoolTester.newPeer("peer2", 2, 6)
peer1.AddPeer()
go peer1.serveBlocks(8, 9)
go peer1.serveBlockHashes(9, 8, 7)
peer1.serveBlocks(3, 7, 8) // make sure this section is complete
time.Sleep(1 * time.Second)
go peer1.serveBlockHashes(7, 3, 2) // block 3/7 is section boundary
peer1.serveBlocks(2, 3) // partially complete sections block 2 missing
peer2.AddPeer() //
go peer2.serveBlocks(5, 6) //
go peer2.serveBlockHashes(6, 5, 4, 3, 2) // peer2 forks on block 3
peer2.serveBlocks(2, 3, 4, 5) // block 2 still missing.
blockPool.RemovePeer("peer2") // peer2 disconnects, peer1 is promoted again as best peer
// peer1.serveBlockHashes(7, 3) // tests that hash request from fork root is remembered even though section process completed
go peer1.serveBlockHashes(2, 1, 0) //
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[9] = []int{}
blockPoolTester.refBlockChain[3] = []int{7}
delete(blockPoolTester.refBlockChain, 6)
delete(blockPoolTester.refBlockChain, 5)
delete(blockPoolTester.refBlockChain, 4)
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}

@ -0,0 +1,350 @@
package blockpool
import (
"fmt"
"math/big"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/pow"
)
var (
waitTimeout = 60 * time.Second
testBlockHashesRequestInterval = 10 * time.Millisecond
testBlocksRequestInterval = 10 * time.Millisecond
requestWatchInterval = 10 * time.Millisecond
)
// test blockChain is an integer trie
type blockChain map[int][]int
// blockPoolTester provides the interface between tests and a blockPool
//
// refBlockChain is used to guide which blocks will be accepted as valid
// blockChain gives the current state of the blockchain and
// accumulates inserts so that we can check the resulting chain
type blockPoolTester struct {
hashPool *test.TestHashPool
lock sync.RWMutex
reqlock sync.RWMutex
blocksRequestsMap map[int]bool
refBlockChain blockChain
blockChain blockChain
blockPool *BlockPool
t *testing.T
}
func newTestBlockPool(t *testing.T) (hashPool *test.TestHashPool, blockPool *BlockPool, b *blockPoolTester) {
hashPool = test.NewHashPool()
b = &blockPoolTester{
t: t,
hashPool: hashPool,
blockChain: make(blockChain),
refBlockChain: make(blockChain),
blocksRequestsMap: make(map[int]bool),
}
b.blockPool = New(b.hasBlock, b.insertChain, b.verifyPoW)
blockPool = b.blockPool
blockPool.Config.BlockHashesRequestInterval = testBlockHashesRequestInterval
blockPool.Config.BlocksRequestInterval = testBlocksRequestInterval
return
}
func (self *blockPoolTester) Errorf(format string, params ...interface{}) {
fmt.Printf(format+"\n", params...)
self.t.Errorf(format, params...)
}
// blockPoolTester implements the 3 callbacks needed by the blockPool:
// hasBlock, insetChain, verifyPoW
func (self *blockPoolTester) hasBlock(block []byte) (ok bool) {
self.lock.RLock()
defer self.lock.RUnlock()
indexes := self.hashPool.HashesToIndexes([][]byte{block})
i := indexes[0]
_, ok = self.blockChain[i]
fmt.Printf("has block %v (%x...): %v\n", i, block[0:4], ok)
return
}
func (self *blockPoolTester) insertChain(blocks types.Blocks) error {
self.lock.Lock()
defer self.lock.Unlock()
var parent, child int
var children, refChildren []int
var ok bool
for _, block := range blocks {
child = self.hashPool.HashesToIndexes([][]byte{block.Hash()})[0]
_, ok = self.blockChain[child]
if ok {
fmt.Printf("block %v already in blockchain\n", child)
continue // already in chain
}
parent = self.hashPool.HashesToIndexes([][]byte{block.ParentHeaderHash})[0]
children, ok = self.blockChain[parent]
if !ok {
return fmt.Errorf("parent %v not in blockchain ", parent)
}
ok = false
var found bool
refChildren, found = self.refBlockChain[parent]
if found {
for _, c := range refChildren {
if c == child {
ok = true
}
}
if !ok {
return fmt.Errorf("invalid block %v", child)
}
} else {
ok = true
}
if ok {
// accept any blocks if parent not in refBlockChain
fmt.Errorf("blockchain insert %v -> %v\n", parent, child)
self.blockChain[parent] = append(children, child)
self.blockChain[child] = nil
}
}
return nil
}
func (self *blockPoolTester) verifyPoW(pblock pow.Block) bool {
return true
}
// test helper that compares the resulting blockChain to the desired blockChain
func (self *blockPoolTester) checkBlockChain(blockChain map[int][]int) {
self.lock.RLock()
defer self.lock.RUnlock()
for k, v := range self.blockChain {
fmt.Printf("got: %v -> %v\n", k, v)
}
for k, v := range blockChain {
fmt.Printf("expected: %v -> %v\n", k, v)
}
if len(blockChain) != len(self.blockChain) {
self.Errorf("blockchain incorrect (zlength differ)")
}
for k, v := range blockChain {
vv, ok := self.blockChain[k]
if !ok || !test.ArrayEq(v, vv) {
self.Errorf("blockchain incorrect on %v -> %v (!= %v)", k, vv, v)
}
}
}
//
// peerTester provides the peer callbacks for the blockPool
// it registers actual callbacks so that the result can be compared to desired behaviour
// provides helper functions to mock the protocol calls to the blockPool
type peerTester struct {
blockHashesRequests []int
blocksRequests [][]int
blocksRequestsMap map[int]bool
peerErrors []int
blockPool *BlockPool
hashPool *test.TestHashPool
lock sync.RWMutex
bt *blockPoolTester
id string
td int
currentBlock int
t *testing.T
}
// peerTester constructor takes hashPool and blockPool from the blockPoolTester
func (self *blockPoolTester) newPeer(id string, td int, cb int) *peerTester {
return &peerTester{
id: id,
td: td,
currentBlock: cb,
hashPool: self.hashPool,
blockPool: self.blockPool,
t: self.t,
bt: self,
blocksRequestsMap: self.blocksRequestsMap,
}
}
func (self *peerTester) Errorf(format string, params ...interface{}) {
fmt.Printf(format+"\n", params...)
self.t.Errorf(format, params...)
}
// helper to compare actual and expected block requests
func (self *peerTester) checkBlocksRequests(blocksRequests ...[]int) {
if len(blocksRequests) > len(self.blocksRequests) {
self.Errorf("blocks requests incorrect (length differ)\ngot %v\nexpected %v", self.blocksRequests, blocksRequests)
} else {
for i, rr := range blocksRequests {
r := self.blocksRequests[i]
if !test.ArrayEq(r, rr) {
self.Errorf("blocks requests incorrect\ngot %v\nexpected %v", self.blocksRequests, blocksRequests)
}
}
}
}
// helper to compare actual and expected block hash requests
func (self *peerTester) checkBlockHashesRequests(blocksHashesRequests ...int) {
rr := blocksHashesRequests
self.lock.RLock()
r := self.blockHashesRequests
self.lock.RUnlock()
if len(r) != len(rr) {
self.Errorf("block hashes requests incorrect (length differ)\ngot %v\nexpected %v", r, rr)
} else {
if !test.ArrayEq(r, rr) {
self.Errorf("block hashes requests incorrect\ngot %v\nexpected %v", r, rr)
}
}
}
// waiter function used by peer.serveBlocks
// blocking until requests appear
// since block requests are sent to any random peers
// block request map is shared between peers
// times out after waitTimeout
func (self *peerTester) waitBlocksRequests(blocksRequest ...int) {
timeout := time.After(waitTimeout)
rr := blocksRequest
for {
self.lock.RLock()
r := self.blocksRequestsMap
fmt.Printf("[%s] blocks request check %v (%v)\n", self.id, rr, r)
i := 0
for i = 0; i < len(rr); i++ {
_, ok := r[rr[i]]
if !ok {
break
}
}
self.lock.RUnlock()
if i == len(rr) {
return
}
time.Sleep(requestWatchInterval)
select {
case <-timeout:
default:
}
}
}
// waiter function used by peer.serveBlockHashes
// blocking until requests appear
// times out after a period
func (self *peerTester) waitBlockHashesRequests(blocksHashesRequest int) {
timeout := time.After(waitTimeout)
rr := blocksHashesRequest
for i := 0; ; {
self.lock.RLock()
r := self.blockHashesRequests
self.lock.RUnlock()
fmt.Printf("[%s] block hash request check %v (%v)\n", self.id, rr, r)
for ; i < len(r); i++ {
if rr == r[i] {
return
}
}
time.Sleep(requestWatchInterval)
select {
case <-timeout:
default:
}
}
}
// mocks a simple blockchain 0 (genesis) ... n (head)
func (self *blockPoolTester) initRefBlockChain(n int) {
for i := 0; i < n; i++ {
self.refBlockChain[i] = []int{i + 1}
}
}
// peerTester functions that mimic protocol calls to the blockpool
// registers the peer with the blockPool
func (self *peerTester) AddPeer() bool {
hash := self.hashPool.IndexesToHashes([]int{self.currentBlock})[0]
return self.blockPool.AddPeer(big.NewInt(int64(self.td)), hash, self.id, self.requestBlockHashes, self.requestBlocks, self.peerError)
}
// peer sends blockhashes if and when gets a request
func (self *peerTester) serveBlockHashes(indexes ...int) {
fmt.Printf("ready to serve block hashes %v\n", indexes)
self.waitBlockHashesRequests(indexes[0])
self.sendBlockHashes(indexes...)
}
func (self *peerTester) sendBlockHashes(indexes ...int) {
fmt.Printf("adding block hashes %v\n", indexes)
hashes := self.hashPool.IndexesToHashes(indexes)
i := 1
next := func() (hash []byte, ok bool) {
if i < len(hashes) {
hash = hashes[i]
ok = true
i++
}
return
}
self.blockPool.AddBlockHashes(next, self.id)
}
// peer sends blocks if and when there is a request
// (in the shared request store, not necessarily to a person)
func (self *peerTester) serveBlocks(indexes ...int) {
fmt.Printf("ready to serve blocks %v\n", indexes[1:])
self.waitBlocksRequests(indexes[1:]...)
self.sendBlocks(indexes...)
}
func (self *peerTester) sendBlocks(indexes ...int) {
fmt.Printf("adding blocks %v \n", indexes)
hashes := self.hashPool.IndexesToHashes(indexes)
for i := 1; i < len(hashes); i++ {
fmt.Printf("adding block %v %x\n", indexes[i], hashes[i][:4])
self.blockPool.AddBlock(&types.Block{HeaderHash: ethutil.Bytes(hashes[i]), ParentHeaderHash: ethutil.Bytes(hashes[i-1])}, self.id)
}
}
// peer callbacks
// -1 is special: not found (a hash never seen)
// records block hashes requests by the blockPool
func (self *peerTester) requestBlockHashes(hash []byte) error {
indexes := self.hashPool.HashesToIndexes([][]byte{hash})
fmt.Printf("[%s] block hash request %v %x\n", self.id, indexes[0], hash[:4])
self.lock.Lock()
defer self.lock.Unlock()
self.blockHashesRequests = append(self.blockHashesRequests, indexes[0])
return nil
}
// records block requests by the blockPool
func (self *peerTester) requestBlocks(hashes [][]byte) error {
indexes := self.hashPool.HashesToIndexes(hashes)
fmt.Printf("blocks request %v %x...\n", indexes, hashes[0][:4])
self.bt.reqlock.Lock()
defer self.bt.reqlock.Unlock()
self.blocksRequests = append(self.blocksRequests, indexes)
for _, i := range indexes {
self.blocksRequestsMap[i] = true
}
return nil
}
// records the error codes of all the peerErrors found the blockPool
func (self *peerTester) peerError(err *errs.Error) {
self.peerErrors = append(self.peerErrors, err.Code)
}

40
blockpool/config_test.go Normal file

@ -0,0 +1,40 @@
package blockpool
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
)
func TestBlockPoolConfig(t *testing.T) {
test.LogInit()
blockPool := &BlockPool{Config: &Config{}}
blockPool.Start()
c := blockPool.Config
test.CheckInt("BlockHashesBatchSize", c.BlockHashesBatchSize, blockHashesBatchSize, t)
test.CheckInt("BlockBatchSize", c.BlockBatchSize, blockBatchSize, t)
test.CheckInt("BlocksRequestRepetition", c.BlocksRequestRepetition, blocksRequestRepetition, t)
test.CheckInt("BlocksRequestMaxIdleRounds", c.BlocksRequestMaxIdleRounds, blocksRequestMaxIdleRounds, t)
test.CheckDuration("BlockHashesRequestInterval", c.BlockHashesRequestInterval, blockHashesRequestInterval, t)
test.CheckDuration("BlocksRequestInterval", c.BlocksRequestInterval, blocksRequestInterval, t)
test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, blockHashesTimeout, t)
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
}
func TestBlockPoolOverrideConfig(t *testing.T) {
test.LogInit()
blockPool := &BlockPool{Config: &Config{}}
c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0}
blockPool.Config = c
blockPool.Start()
test.CheckInt("BlockHashesBatchSize", c.BlockHashesBatchSize, 128, t)
test.CheckInt("BlockBatchSize", c.BlockBatchSize, 32, t)
test.CheckInt("BlocksRequestRepetition", c.BlocksRequestRepetition, blocksRequestRepetition, t)
test.CheckInt("BlocksRequestMaxIdleRounds", c.BlocksRequestMaxIdleRounds, blocksRequestMaxIdleRounds, t)
test.CheckDuration("BlockHashesRequestInterval", c.BlockHashesRequestInterval, 300*time.Millisecond, t)
test.CheckDuration("BlocksRequestInterval", c.BlocksRequestInterval, 100*time.Millisecond, t)
test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, 90*time.Second, t)
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
}

124
blockpool/errors_test.go Normal file

@ -0,0 +1,124 @@
package blockpool
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/pow"
)
func TestInvalidBlock(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(2)
blockPoolTester.refBlockChain[2] = []int{}
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer1.AddPeer()
go peer1.serveBlocks(2, 3)
go peer1.serveBlockHashes(3, 2, 1, 0)
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[2] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
if len(peer1.peerErrors) == 1 {
if peer1.peerErrors[0] != ErrInvalidBlock {
t.Errorf("wrong error, got %v, expected %v", peer1.peerErrors[0], ErrInvalidBlock)
}
} else {
t.Errorf("expected %v error, got %v", ErrInvalidBlock, peer1.peerErrors)
}
}
func TestVerifyPoW(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(3)
first := false
blockPoolTester.blockPool.verifyPoW = func(b pow.Block) bool {
bb, _ := b.(*types.Block)
indexes := blockPoolTester.hashPool.HashesToIndexes([][]byte{bb.Hash()})
if indexes[0] == 2 && !first {
first = true
return false
} else {
return true
}
}
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer2 := blockPoolTester.newPeer("peer2", 1, 3)
peer1.AddPeer()
peer2.AddPeer()
go peer1.serveBlocks(2, 3)
go peer1.serveBlockHashes(3, 2, 1, 0)
peer1.serveBlocks(0, 1, 2, 3)
blockPoolTester.blockPool.verifyPoW = func(b pow.Block) bool {
return true
}
peer2.serveBlocks(1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[3] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
if len(peer1.peerErrors) == 1 {
if peer1.peerErrors[0] != ErrInvalidPoW {
t.Errorf("wrong error, expected %v, got %v", ErrInvalidPoW, peer1.peerErrors[0])
}
} else {
t.Errorf("expected %v error, got %v", ErrInvalidPoW, peer1.peerErrors)
}
}
func TestUnrequestedBlock(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer1.AddPeer()
peer1.sendBlocks(1, 2)
// blockPool.Wait(waitTimeout)
blockPool.Stop()
if len(peer1.peerErrors) == 1 {
if peer1.peerErrors[0] != ErrUnrequestedBlock {
t.Errorf("wrong error, got %v, expected %v", peer1.peerErrors[0], ErrUnrequestedBlock)
}
} else {
t.Errorf("expected %v error, got %v", ErrUnrequestedBlock, peer1.peerErrors)
}
}
func TestErrInsufficientChainInfo(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPool.Config.BlockHashesTimeout = 100 * time.Millisecond
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer1.AddPeer()
blockPool.Wait(waitTimeout)
blockPool.Stop()
if len(peer1.peerErrors) == 1 {
if peer1.peerErrors[0] != ErrInsufficientChainInfo {
t.Errorf("wrong error, got %v, expected %v", peer1.peerErrors[0], ErrInsufficientChainInfo)
}
} else {
t.Errorf("expected %v error, got %v", ErrInsufficientChainInfo, peer1.peerErrors)
}
}

536
blockpool/peers.go Normal file

@ -0,0 +1,536 @@
package blockpool
import (
"bytes"
"math/big"
"math/rand"
"sort"
"sync"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
"github.com/ethereum/go-ethereum/ethutil"
)
type peer struct {
lock sync.RWMutex
// last known blockchain status
td *big.Int
currentBlockHash []byte
currentBlock *types.Block
parentHash []byte
headSection *section
id string
// peer callbacks
requestBlockHashes func([]byte) error
requestBlocks func([][]byte) error
peerError func(*errs.Error)
errors *errs.Errors
sections [][]byte
// channels to push new head block and head section for peer a
currentBlockC chan *types.Block
headSectionC chan *section
// channels to signal peers witch and peer quit
idleC chan bool
switchC chan bool
quit chan bool
bp *BlockPool
// timers for head section process
blockHashesRequestTimer <-chan time.Time
blocksRequestTimer <-chan time.Time
suicide <-chan time.Time
idle bool
}
// peers is the component keeping a record of peers in a hashmap
//
type peers struct {
lock sync.RWMutex
bp *BlockPool
errors *errs.Errors
peers map[string]*peer
best *peer
status *status
}
// peer constructor
func (self *peers) newPeer(
td *big.Int,
currentBlockHash []byte,
id string,
requestBlockHashes func([]byte) error,
requestBlocks func([][]byte) error,
peerError func(*errs.Error),
) (p *peer) {
p = &peer{
errors: self.errors,
td: td,
currentBlockHash: currentBlockHash,
id: id,
requestBlockHashes: requestBlockHashes,
requestBlocks: requestBlocks,
peerError: peerError,
currentBlockC: make(chan *types.Block),
headSectionC: make(chan *section),
bp: self.bp,
idle: true,
}
// at creation the peer is recorded in the peer pool
self.peers[id] = p
return
}
// dispatches an error to a peer if still connected
func (self *peers) peerError(id string, code int, format string, params ...interface{}) {
self.lock.RLock()
defer self.lock.RUnlock()
peer, ok := self.peers[id]
if ok {
peer.addError(code, format, params)
}
// blacklisting comes here
}
func (self *peer) addError(code int, format string, params ...interface{}) {
err := self.errors.New(code, format, params...)
self.peerError(err)
}
func (self *peer) setChainInfo(td *big.Int, c []byte) {
self.lock.Lock()
defer self.lock.Unlock()
self.td = td
self.currentBlockHash = c
self.currentBlock = nil
self.parentHash = nil
self.headSection = nil
}
func (self *peer) setChainInfoFromBlock(block *types.Block) {
self.lock.Lock()
defer self.lock.Unlock()
// use the optional TD to update peer td, this helps second best peer selection
// in case best peer is lost
if block.Td != nil && block.Td.Cmp(self.td) > 0 {
plog.DebugDetailf("setChainInfoFromBlock: update <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(block.Hash()), self.td, block.Td)
self.td = block.Td
self.currentBlockHash = block.Hash()
self.parentHash = block.ParentHash()
self.currentBlock = block
self.headSection = nil
}
self.bp.wg.Add(1)
go func() {
self.currentBlockC <- block
self.bp.wg.Done()
}()
}
func (self *peers) requestBlocks(attempts int, hashes [][]byte) {
// distribute block request among known peers
self.lock.RLock()
defer self.lock.RUnlock()
peerCount := len(self.peers)
// on first attempt use the best peer
if attempts == 0 {
plog.DebugDetailf("request %v missing blocks from best peer <%s>", len(hashes), self.best.id)
self.best.requestBlocks(hashes)
return
}
repetitions := self.bp.Config.BlocksRequestRepetition
if repetitions > peerCount {
repetitions = peerCount
}
i := 0
indexes := rand.Perm(peerCount)[0:repetitions]
sort.Ints(indexes)
plog.DebugDetailf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
for _, peer := range self.peers {
if i == indexes[0] {
plog.DebugDetailf("request length: %v", len(hashes))
plog.DebugDetailf("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id)
peer.requestBlocks(hashes)
indexes = indexes[1:]
if len(indexes) == 0 {
break
}
}
i++
}
self.bp.putHashSlice(hashes)
}
// addPeer implements the logic for blockpool.AddPeer
// returns true iff peer is promoted as best peer in the pool
func (self *peers) addPeer(
td *big.Int,
currentBlockHash []byte,
id string,
requestBlockHashes func([]byte) error,
requestBlocks func([][]byte) error,
peerError func(*errs.Error),
) (best bool) {
var previousBlockHash []byte
self.lock.Lock()
p, found := self.peers[id]
if found {
if !bytes.Equal(p.currentBlockHash, currentBlockHash) {
previousBlockHash = p.currentBlockHash
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
p.setChainInfo(td, currentBlockHash)
self.status.lock.Lock()
self.status.values.NewBlocks++
self.status.lock.Unlock()
}
} else {
p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError)
self.status.lock.Lock()
self.status.peers[id]++
self.status.values.NewBlocks++
self.status.lock.Unlock()
plog.Debugf("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash))
}
self.lock.Unlock()
// check peer current head
if self.bp.hasBlock(currentBlockHash) {
// peer not ahead
return false
}
if self.best == p {
// new block update for active current best peer -> request hashes
plog.Debugf("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash))
if previousBlockHash != nil {
if entry := self.bp.get(previousBlockHash); entry != nil {
p.headSectionC <- nil
self.bp.activateChain(entry.section, p, nil)
p.sections = append(p.sections, previousBlockHash)
}
}
best = true
} else {
currentTD := ethutil.Big0
if self.best != nil {
currentTD = self.best.td
}
if td.Cmp(currentTD) > 0 {
self.status.lock.Lock()
self.status.bestPeers[p.id]++
self.status.lock.Unlock()
plog.Debugf("addPeer: peer <%v> promoted best peer", id)
self.bp.switchPeer(self.best, p)
self.best = p
best = true
}
}
return
}
// removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects
func (self *peers) removePeer(id string) {
self.lock.Lock()
defer self.lock.Unlock()
p, found := self.peers[id]
if !found {
return
}
delete(self.peers, id)
plog.Debugf("addPeer: remove peer <%v>", id)
// if current best peer is removed, need to find a better one
if self.best == p {
var newp *peer
// FIXME: own TD
max := ethutil.Big0
// peer with the highest self-acclaimed TD is chosen
for _, pp := range self.peers {
if pp.td.Cmp(max) > 0 {
max = pp.td
newp = pp
}
}
if newp != nil {
self.status.lock.Lock()
self.status.bestPeers[p.id]++
self.status.lock.Unlock()
plog.Debugf("addPeer: peer <%v> with td %v promoted best peer", newp.id, newp.td)
} else {
plog.Warnln("addPeer: no suitable peers found")
}
self.best = newp
self.bp.switchPeer(p, newp)
}
}
// switchPeer launches section processes based on information about
// shared interest and legacy of peers
func (self *BlockPool) switchPeer(oldp, newp *peer) {
// first quit AddBlockHashes, requestHeadSection and activateChain
if oldp != nil {
plog.DebugDetailf("<%s> quit peer processes", oldp.id)
close(oldp.switchC)
}
if newp != nil {
newp.idleC = make(chan bool)
newp.switchC = make(chan bool)
// if new best peer has no head section yet, create it and run it
// otherwise head section is an element of peer.sections
if newp.headSection == nil {
plog.DebugDetailf("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash))
if newp.idle {
self.wg.Add(1)
newp.idle = false
self.syncing()
}
go func() {
newp.run()
if !newp.idle {
self.wg.Done()
newp.idle = true
}
}()
}
var connected = make(map[string]*section)
var sections [][]byte
for _, hash := range newp.sections {
plog.DebugDetailf("activate chain starting from section [%s]", hex(hash))
// if section not connected (ie, top of a contiguous sequence of sections)
if connected[string(hash)] == nil {
// if not deleted, then reread from pool (it can be orphaned top half of a split section)
if entry := self.get(hash); entry != nil {
self.activateChain(entry.section, newp, connected)
connected[string(hash)] = entry.section
sections = append(sections, hash)
}
}
}
plog.DebugDetailf("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections))
// need to lock now that newp is exposed to section processes
newp.lock.Lock()
newp.sections = sections
newp.lock.Unlock()
}
// finally deactivate section process for sections where newp didnt activate
// newp activating section process changes the quit channel for this reason
if oldp != nil {
plog.DebugDetailf("<%s> quit section processes", oldp.id)
//
close(oldp.idleC)
}
}
func (self *peers) getPeer(id string) (p *peer, best bool) {
self.lock.RLock()
defer self.lock.RUnlock()
if self.best != nil && self.best.id == id {
return self.best, true
}
p = self.peers[id]
return
}
func (self *peer) handleSection(sec *section) {
self.lock.Lock()
defer self.lock.Unlock()
self.headSection = sec
self.blockHashesRequestTimer = nil
if sec == nil {
if self.idle {
self.idle = false
self.bp.wg.Add(1)
self.bp.syncing()
}
self.suicide = time.After(self.bp.Config.BlockHashesTimeout)
plog.DebugDetailf("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash))
} else {
if !self.idle {
self.idle = true
self.suicide = nil
self.bp.wg.Done()
}
plog.DebugDetailf("HeadSection: <%s> head section [%s] created", self.id, sectionhex(sec))
}
}
func (self *peer) getCurrentBlock(currentBlock *types.Block) {
// called by update or after AddBlock signals that head block of current peer is received
if currentBlock == nil {
if entry := self.bp.get(self.currentBlockHash); entry != nil {
entry.node.lock.Lock()
currentBlock = entry.node.block
entry.node.lock.Unlock()
}
if currentBlock != nil {
plog.DebugDetailf("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash))
} else {
plog.DebugDetailf("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash))
self.requestBlocks([][]byte{self.currentBlockHash})
self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
return
}
} else {
plog.DebugDetailf("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash()))
}
self.lock.Lock()
defer self.lock.Unlock()
self.currentBlock = currentBlock
self.parentHash = currentBlock.ParentHash()
plog.DebugDetailf("HeadSection: <%s> head block %s found (parent: [%s])... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash))
self.blockHashesRequestTimer = time.After(0)
self.blocksRequestTimer = nil
}
func (self *peer) getBlockHashes() {
//if connecting parent is found
if self.bp.hasBlock(self.parentHash) {
plog.DebugDetailf("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash))
err := self.bp.insertChain(types.Blocks([]*types.Block{self.currentBlock}))
if err != nil {
self.addError(ErrInvalidBlock, "%v", err)
self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock()
}
} else {
if parent := self.bp.get(self.parentHash); parent != nil {
if self.bp.get(self.currentBlockHash) == nil {
plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash))
n := &node{
hash: self.currentBlockHash,
block: self.currentBlock,
hashBy: self.id,
blockBy: self.id,
}
self.bp.newSection([]*node{n}).activate(self)
} else {
plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section))
self.bp.activateChain(parent.section, self, nil)
}
} else {
plog.DebugDetailf("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection))
self.requestBlockHashes(self.currentBlockHash)
self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval)
return
}
}
self.blockHashesRequestTimer = nil
if !self.idle {
self.idle = true
self.suicide = nil
self.bp.wg.Done()
}
}
// main loop for head section process
func (self *peer) run() {
self.lock.RLock()
switchC := self.switchC
currentBlockHash := self.currentBlockHash
self.lock.RUnlock()
self.blockHashesRequestTimer = nil
self.blocksRequestTimer = time.After(0)
self.suicide = time.After(self.bp.Config.BlockHashesTimeout)
var quit chan bool
var ping = time.NewTicker(5 * time.Second)
LOOP:
for {
select {
case <-ping.C:
plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)
// signal from AddBlockHashes that head section for current best peer is created
// if sec == nil, it signals that chain info has updated (new block message)
case sec := <-self.headSectionC:
self.handleSection(sec)
// local var quit channel is linked to sections suicide channel so that
if sec == nil {
quit = nil
} else {
quit = sec.suicideC
}
// periodic check for block hashes or parent block/section
case <-self.blockHashesRequestTimer:
self.getBlockHashes()
// signal from AddBlock that head block of current best peer has been received
case currentBlock := <-self.currentBlockC:
self.getCurrentBlock(currentBlock)
// keep requesting until found or timed out
case <-self.blocksRequestTimer:
self.getCurrentBlock(nil)
// quitting on timeout
case <-self.suicide:
self.peerError(self.bp.peers.errors.New(ErrInsufficientChainInfo, "timed out without providing block hashes or head block", currentBlockHash))
self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock()
// there is no persistence here, so GC will just take care of cleaning up
break LOOP
// signal for peer switch, quit
case <-switchC:
var complete = "incomplete "
if self.idle {
complete = "complete"
}
plog.Debugf("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete)
break LOOP
// global quit for blockpool
case <-self.bp.quit:
break LOOP
// quit
case <-quit:
break LOOP
}
}
if !self.idle {
self.idle = true
self.bp.wg.Done()
}
}

120
blockpool/peers_test.go Normal file

@ -0,0 +1,120 @@
package blockpool
import (
"math/big"
"testing"
"github.com/ethereum/go-ethereum/blockpool/test"
)
// the actual tests
func TestAddPeer(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
peer0 := blockPoolTester.newPeer("peer0", 1, 0)
peer1 := blockPoolTester.newPeer("peer1", 2, 1)
peer2 := blockPoolTester.newPeer("peer2", 3, 2)
var bestpeer *peer
blockPool.Start()
// pool
best := peer0.AddPeer()
if !best {
t.Errorf("peer0 (TD=1) not accepted as best")
}
if blockPool.peers.best.id != "peer0" {
t.Errorf("peer0 (TD=1) not set as best")
}
best = peer2.AddPeer()
if !best {
t.Errorf("peer2 (TD=3) not accepted as best")
}
if blockPool.peers.best.id != "peer2" {
t.Errorf("peer2 (TD=3) not set as best")
}
peer2.waitBlocksRequests(2)
best = peer1.AddPeer()
if best {
t.Errorf("peer1 (TD=2) accepted as best")
}
if blockPool.peers.best.id != "peer2" {
t.Errorf("peer2 (TD=3) not set any more as best")
}
if blockPool.peers.best.td.Cmp(big.NewInt(int64(3))) != 0 {
t.Errorf("peer1 TD not set")
}
peer2.td = 4
peer2.currentBlock = 3
best = peer2.AddPeer()
if !best {
t.Errorf("peer2 (TD=4) not accepted as best")
}
if blockPool.peers.best.id != "peer2" {
t.Errorf("peer2 (TD=4) not set as best")
}
if blockPool.peers.best.td.Cmp(big.NewInt(int64(4))) != 0 {
t.Errorf("peer2 TD not updated")
}
peer2.waitBlocksRequests(3)
peer1.td = 3
peer1.currentBlock = 2
best = peer1.AddPeer()
if best {
t.Errorf("peer1 (TD=3) should not be set as best")
}
if blockPool.peers.best.id == "peer1" {
t.Errorf("peer1 (TD=3) should not be set as best")
}
bestpeer, best = blockPool.peers.getPeer("peer1")
if bestpeer.td.Cmp(big.NewInt(int64(3))) != 0 {
t.Errorf("peer1 TD should be updated")
}
blockPool.RemovePeer("peer2")
bestpeer, best = blockPool.peers.getPeer("peer2")
if bestpeer != nil {
t.Errorf("peer2 not removed")
}
if blockPool.peers.best.id != "peer1" {
t.Errorf("existing peer1 (TD=3) should be set as best peer")
}
peer1.waitBlocksRequests(2)
blockPool.RemovePeer("peer1")
bestpeer, best = blockPool.peers.getPeer("peer1")
if bestpeer != nil {
t.Errorf("peer1 not removed")
}
if blockPool.peers.best.id != "peer0" {
t.Errorf("existing peer0 (TD=1) should be set as best peer")
}
peer0.waitBlocksRequests(0)
blockPool.RemovePeer("peer0")
bestpeer, best = blockPool.peers.getPeer("peer0")
if bestpeer != nil {
t.Errorf("peer1 not removed")
}
// adding back earlier peer ok
peer0.currentBlock = 3
best = peer0.AddPeer()
if !best {
t.Errorf("peer0 (TD=1) should be set as best")
}
if blockPool.peers.best.id != "peer0" {
t.Errorf("peer0 (TD=1) should be set as best")
}
peer0.waitBlocksRequests(3)
blockPool.Stop()
}

677
blockpool/section.go Normal file

@ -0,0 +1,677 @@
package blockpool
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/core/types"
)
/*
section is the worker on each chain section in the block pool
- remove the section if there are blocks missing after an absolute time
- remove the section if there are maxIdleRounds of idle rounds of block requests with no response
- periodically polls the chain section for missing blocks which are then requested from peers
- registers the process controller on the peer so that if the peer is promoted as best peer the second time (after a disconnect of a better one), all active processes are switched back on unless they removed (inserted in blockchain, invalid or expired)
- when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking
- when turned back on it recursively calls itself on the root of the next chain section
*/
type section struct {
lock sync.RWMutex
parent *section // connecting section back in time towards blockchain
child *section // connecting section forward in time
top *node // the topmost node = head node = youngest node within the chain section
bottom *node // the bottom node = root node = oldest node within the chain section
nodes []*node
peer *peer
parentHash []byte
blockHashes [][]byte
poolRootIndex int
bp *BlockPool
controlC chan *peer // to (de)register the current best peer
poolRootC chan *peer // indicate connectedness to blockchain (well, known blocks)
offC chan bool // closed if process terminated
suicideC chan bool // initiate suicide on the section
quitInitC chan bool // to signal end of initialisation
forkC chan chan bool // freeze section process while splitting
switchC chan bool // switching
idleC chan bool // channel to indicate thai food
processC chan *node //
missingC chan *node //
blocksRequestTimer <-chan time.Time
blockHashesRequestTimer <-chan time.Time
suicideTimer <-chan time.Time
blocksRequests int
blockHashesRequests int
blocksRequestsComplete bool
blockHashesRequestsComplete bool
ready bool
same bool
initialised bool
active bool
step int
idle int
missing int
lastMissing int
depth int
invalid bool
poolRoot bool
}
//
func (self *BlockPool) newSection(nodes []*node) *section {
sec := &section{
bottom: nodes[len(nodes)-1],
top: nodes[0],
nodes: nodes,
poolRootIndex: len(nodes),
bp: self,
controlC: make(chan *peer),
poolRootC: make(chan *peer),
offC: make(chan bool),
}
for i, node := range nodes {
entry := &entry{node: node, section: sec, index: &index{i}}
self.set(node.hash, entry)
}
plog.DebugDetailf("[%s] setup section process", sectionhex(sec))
go sec.run()
return sec
}
func (self *section) addSectionToBlockChain(p *peer) {
self.bp.wg.Add(1)
go func() {
self.lock.Lock()
defer self.lock.Unlock()
defer func() {
self.bp.wg.Done()
}()
var node *node
var keys []string
var blocks []*types.Block
for self.poolRootIndex > 0 {
node = self.nodes[self.poolRootIndex-1]
node.lock.RLock()
block := node.block
node.lock.RUnlock()
if block == nil {
break
}
self.poolRootIndex--
keys = append(keys, string(node.hash))
blocks = append(blocks, block)
}
if len(blocks) == 0 {
return
}
self.bp.lock.Lock()
for _, key := range keys {
delete(self.bp.pool, key)
}
self.bp.lock.Unlock()
plog.Infof("[%s] insert %v blocks [%v/%v] into blockchain", sectionhex(self), len(blocks), hex(blocks[0].Hash()), hex(blocks[len(blocks)-1].Hash()))
err := self.bp.insertChain(blocks)
if err != nil {
self.invalid = true
self.bp.peers.peerError(node.blockBy, ErrInvalidBlock, "%v", err)
plog.Warnf("invalid block %x", node.hash)
plog.Warnf("penalise peers %v (hash), %v (block)", node.hashBy, node.blockBy)
// or invalid block and the entire chain needs to be removed
self.removeInvalidChain()
} else {
// if all blocks inserted in this section
// then need to try to insert blocks in child section
if self.poolRootIndex == 0 {
// if there is a child section, then recursively call itself:
// also if section process is not terminated,
// then signal blockchain connectivity with poolRootC
if child := self.bp.getChild(self); child != nil {
select {
case <-child.offC:
plog.DebugDetailf("[%s] add complete child section [%s] to the blockchain", sectionhex(self), sectionhex(child))
case child.poolRootC <- p:
plog.DebugDetailf("[%s] add incomplete child section [%s] to the blockchain", sectionhex(self), sectionhex(child))
}
child.addSectionToBlockChain(p)
} else {
plog.DebugDetailf("[%s] no child section in pool", sectionhex(self))
}
plog.DebugDetailf("[%s] section completely inserted to blockchain - remove", sectionhex(self))
// complete sections are removed. if called from within section process,
// this must run in its own go routine to avoid deadlock
self.remove()
}
}
self.bp.status.lock.Lock()
if err == nil {
headKey := string(blocks[0].ParentHash())
height := self.bp.status.chain[headKey] + len(blocks)
self.bp.status.chain[string(blocks[len(blocks)-1].Hash())] = height
if height > self.bp.status.values.LongestChain {
self.bp.status.values.LongestChain = height
}
delete(self.bp.status.chain, headKey)
}
self.bp.status.values.BlocksInChain += len(blocks)
self.bp.status.values.BlocksInPool -= len(blocks)
if err != nil {
self.bp.status.badPeers[node.blockBy]++
}
self.bp.status.lock.Unlock()
}()
}
func (self *section) run() {
// absolute time after which sub-chain is killed if not complete (some blocks are missing)
self.suicideC = make(chan bool)
self.forkC = make(chan chan bool)
self.suicideTimer = time.After(self.bp.Config.BlocksTimeout)
// node channels for the section
// container for missing block hashes
var checking bool
var ping = time.NewTicker(5 * time.Second)
LOOP:
for !self.blockHashesRequestsComplete || !self.blocksRequestsComplete {
select {
case <-ping.C:
var name = "no peer"
if self.peer != nil {
name = self.peer.id
}
plog.DebugDetailf("[%s] peer <%s> active: %v", sectionhex(self), name, self.active)
// global quit from blockpool
case <-self.bp.quit:
break LOOP
// pause for peer switching
case <-self.switchC:
self.switchC = nil
case p := <-self.poolRootC:
// signal on pool root channel indicates that the blockpool is
// connected to the blockchain, insert the longest chain of blocks
// ignored in idle mode to avoid inserting chain sections of non-live peers
self.poolRoot = true
// switch off hash requests in case they were on
self.blockHashesRequestTimer = nil
self.blockHashesRequestsComplete = true
self.switchOn(p)
// peer quit or demoted, put section in idle mode
case <-self.idleC:
// peer quit or demoted, put section in idle mode
plog.Debugf("[%s] peer <%s> quit or demoted", sectionhex(self), self.peer.id)
self.switchOff()
self.idleC = nil
// timebomb - if section is not complete in time, nuke the entire chain
case <-self.suicideTimer:
self.suicide()
plog.Debugf("[%s] timeout. (%v total attempts): missing %v/%v/%v...suicide", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.suicideTimer = nil
// closing suicideC triggers section suicide: removes section nodes from pool and terminates section process
case <-self.suicideC:
plog.DebugDetailf("[%s] suicide", sectionhex(self))
self.unlink()
self.bp.remove(self)
plog.DebugDetailf("[%s] done", sectionhex(self))
break LOOP
// alarm for checking blocks in the section
case <-self.blocksRequestTimer:
plog.DebugDetailf("[%s] alarm: block request time", sectionhex(self))
self.processC = self.missingC
// alarm for checking parent of the section or sending out hash requests
case <-self.blockHashesRequestTimer:
plog.DebugDetailf("[%s] alarm: hash request time", sectionhex(self))
self.blockHashesRequest()
// activate this section process with a peer
case p := <-self.controlC:
if p == nil {
self.switchOff()
} else {
self.switchOn(p)
}
self.bp.wg.Done()
// blocks the process until section is split at the fork
case waiter := <-self.forkC:
<-waiter
self.initialised = false
self.quitInitC = nil
//
case n, ok := <-self.processC:
// channel closed, first iteration finished
if !ok && !self.initialised {
plog.DebugDetailf("[%s] section initalised: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth)
self.initialised = true
self.processC = nil
// self.processC = make(chan *node, self.missing)
self.checkRound()
checking = false
break
}
plog.DebugDetailf("[%s] section proc step %v: missing %v/%v/%v", sectionhex(self), self.step, self.missing, self.lastMissing, self.depth)
if !checking {
self.step = 0
self.missing = 0
checking = true
}
self.step++
n.lock.RLock()
block := n.block
n.lock.RUnlock()
// if node has no block, request it (buffer it for batch request)
// feed it to missingC channel for the next round
if block == nil {
pos := self.missing % self.bp.Config.BlockBatchSize
if pos == 0 {
if self.missing != 0 {
self.bp.requestBlocks(self.blocksRequests, self.blockHashes[:])
}
self.blockHashes = self.bp.getHashSlice()
}
self.blockHashes[pos] = n.hash
self.missing++
self.missingC <- n
} else {
// checking for parent block
if self.poolRoot {
// if node has got block (received via async AddBlock call from protocol)
if self.step == self.lastMissing {
// current root of the pool
plog.DebugDetailf("[%s] received block for current pool root %s", sectionhex(self), hex(n.hash))
self.addSectionToBlockChain(self.peer)
}
} else {
if self.parentHash == nil && n == self.bottom {
self.parentHash = block.ParentHash()
plog.DebugDetailf("[%s] got parent head block hash %s...checking", sectionhex(self), hex(self.parentHash))
self.blockHashesRequest()
}
}
}
if self.initialised && self.step == self.lastMissing {
plog.DebugDetailf("[%s] check if new blocks arrived (attempt %v): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.checkRound()
checking = false
}
} // select
} // for
close(self.offC)
if self.peer != nil {
self.active = false
self.bp.wg.Done()
}
plog.DebugDetailf("[%s] section process terminated: %v blocks retrieved (%v attempts), hash requests complete on root (%v attempts).", sectionhex(self), self.depth, self.blocksRequests, self.blockHashesRequests)
}
func (self *section) switchOn(newpeer *peer) {
oldpeer := self.peer
// reset switchC/switchC to current best peer
self.idleC = newpeer.idleC
self.switchC = newpeer.switchC
self.peer = newpeer
if oldpeer != newpeer {
oldp := "no peer"
newp := "no peer"
if oldpeer != nil {
oldp = oldpeer.id
}
if newpeer != nil {
newp = newpeer.id
}
plog.DebugDetailf("[%s] active mode <%s> -> <%s>", sectionhex(self), oldp, newp)
}
// activate section with current peer
if oldpeer == nil {
self.bp.wg.Add(1)
self.active = true
if !self.blockHashesRequestsComplete {
self.blockHashesRequestTimer = time.After(0)
}
if !self.blocksRequestsComplete {
if !self.initialised {
if self.quitInitC != nil {
<-self.quitInitC
}
self.missingC = make(chan *node, self.bp.Config.BlockHashesBatchSize)
self.processC = make(chan *node, self.bp.Config.BlockHashesBatchSize)
self.quitInitC = make(chan bool)
self.step = 0
self.missing = 0
self.depth = len(self.nodes)
self.lastMissing = self.depth
self.feedNodes()
} else {
self.blocksRequestTimer = time.After(0)
}
}
}
}
// put the section to idle mode
func (self *section) switchOff() {
// active -> idle
if self.peer != nil {
oldp := "no peer"
oldpeer := self.peer
if oldpeer != nil {
oldp = oldpeer.id
}
plog.DebugDetailf("[%s] idle mode peer <%s> -> <> (%v total attempts): missing %v/%v/%v", sectionhex(self), oldp, self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.active = false
self.peer = nil
// turn off timers
self.blocksRequestTimer = nil
self.blockHashesRequestTimer = nil
if self.quitInitC != nil {
<-self.quitInitC
self.quitInitC = nil
}
self.processC = nil
self.bp.wg.Done()
}
}
// iterates through nodes of a section to feed processC
// used to initialise chain section
func (self *section) feedNodes() {
// if not run at least once fully, launch iterator
self.bp.wg.Add(1)
go func() {
self.lock.Lock()
defer self.lock.Unlock()
defer func() {
self.bp.wg.Done()
}()
var n *node
INIT:
for _, n = range self.nodes {
select {
case self.processC <- n:
case <-self.bp.quit:
break INIT
}
}
close(self.processC)
close(self.quitInitC)
}()
}
func (self *section) blockHashesRequest() {
if self.switchC != nil {
self.bp.chainLock.Lock()
parentSection := self.parent
if parentSection == nil {
// only link to new parent if not switching peers
// this protects against synchronisation issue where during switching
// a demoted peer's fork will be chosen over the best peer's chain
// because relinking the correct chain (activateChain) is overwritten here in
// demoted peer's section process just before the section is put to idle mode
if self.parentHash != nil {
if parent := self.bp.get(self.parentHash); parent != nil {
parentSection = parent.section
plog.DebugDetailf("[%s] blockHashesRequest: parent section [%s] linked\n", sectionhex(self), sectionhex(parentSection))
link(parentSection, self)
} else {
if self.bp.hasBlock(self.parentHash) {
self.poolRoot = true
plog.DebugDetailf("[%s] blockHashesRequest: parentHash known ... inserting section in blockchain", sectionhex(self))
self.addSectionToBlockChain(self.peer)
self.blockHashesRequestTimer = nil
self.blockHashesRequestsComplete = true
}
}
}
}
self.bp.chainLock.Unlock()
if !self.poolRoot {
if parentSection != nil {
// activate parent section with this peer
// but only if not during switch mode
plog.DebugDetailf("[%s] parent section [%s] activated\n", sectionhex(self), sectionhex(parentSection))
self.bp.activateChain(parentSection, self.peer, nil)
// if not root of chain, switch off
plog.DebugDetailf("[%s] parent found, hash requests deactivated (after %v total attempts)\n", sectionhex(self), self.blockHashesRequests)
self.blockHashesRequestTimer = nil
self.blockHashesRequestsComplete = true
} else {
self.blockHashesRequests++
plog.DebugDetailf("[%s] hash request on root (%v total attempts)\n", sectionhex(self), self.blockHashesRequests)
self.peer.requestBlockHashes(self.bottom.hash)
self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval)
}
}
}
}
// checks number of missing blocks after each round of request and acts accordingly
func (self *section) checkRound() {
if self.missing == 0 {
// no missing blocks
plog.DebugDetailf("[%s] section checked: got all blocks. process complete (%v total blocksRequests): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.blocksRequestsComplete = true
self.blocksRequestTimer = nil
} else {
// some missing blocks
plog.DebugDetailf("[%s] section checked: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth)
self.blocksRequests++
pos := self.missing % self.bp.Config.BlockBatchSize
if pos == 0 {
pos = self.bp.Config.BlockBatchSize
}
self.bp.requestBlocks(self.blocksRequests, self.blockHashes[:pos])
// handle idle rounds
if self.missing == self.lastMissing {
// idle round
if self.same {
// more than once
self.idle++
// too many idle rounds
if self.idle >= self.bp.Config.BlocksRequestMaxIdleRounds {
plog.DebugDetailf("[%s] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", sectionhex(self), self.idle, self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.suicide()
}
} else {
self.idle = 0
}
self.same = true
} else {
self.same = false
}
self.lastMissing = self.missing
// put processC offline
self.processC = nil
self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
}
}
/*
link connects two sections via parent/child fields
creating a doubly linked list
caller must hold BlockPool chainLock
*/
func link(parent *section, child *section) {
if parent != nil {
exChild := parent.child
parent.child = child
if exChild != nil && exChild != child {
if child != nil {
// if child is nil it is not a real fork
plog.DebugDetailf("[%s] chain fork [%s] -> [%s]", sectionhex(parent), sectionhex(exChild), sectionhex(child))
}
exChild.parent = nil
}
}
if child != nil {
exParent := child.parent
if exParent != nil && exParent != parent {
if parent != nil {
// if parent is nil it is not a real fork, but suicide delinking section
plog.DebugDetailf("[%s] chain reverse fork [%s] -> [%s]", sectionhex(child), sectionhex(exParent), sectionhex(parent))
}
exParent.child = nil
}
child.parent = parent
}
}
/*
handle forks where connecting node is mid-section
by splitting section at fork
no splitting needed if connecting node is head of a section
caller must hold chain lock
*/
func (self *BlockPool) splitSection(parent *section, entry *entry) {
plog.DebugDetailf("[%s] split section at fork", sectionhex(parent))
parent.deactivate()
waiter := make(chan bool)
parent.wait(waiter)
chain := parent.nodes
parent.nodes = chain[entry.index.int:]
parent.top = parent.nodes[0]
parent.poolRootIndex -= entry.index.int
orphan := self.newSection(chain[0:entry.index.int])
link(orphan, parent.child)
close(waiter)
orphan.deactivate()
}
func (self *section) wait(waiter chan bool) {
self.forkC <- waiter
}
func (self *BlockPool) linkSections(nodes []*node, parent, child *section) (sec *section) {
// if new section is created, link it to parent/child sections
// and launch section process fetching block and further hashes
if len(nodes) > 0 {
sec = self.newSection(nodes)
plog.Debugf("[%s]->[%s](%v)->[%s] new chain section", sectionhex(parent), sectionhex(sec), len(nodes), sectionhex(child))
link(parent, sec)
link(sec, child)
} else {
// now this can only happen if we allow response to hash request to include <from> hash
// in this case we just link parent and child (without needing root block of child section)
plog.Debugf("[%s]->[%s] connecting known sections", sectionhex(parent), sectionhex(child))
link(parent, child)
}
return
}
func (self *section) activate(p *peer) {
self.bp.wg.Add(1)
select {
case <-self.offC:
self.bp.wg.Done()
case self.controlC <- p:
plog.DebugDetailf("[%s] activate section process for peer <%s>", sectionhex(self), p.id)
}
}
func (self *section) deactivate() {
self.bp.wg.Add(1)
self.controlC <- nil
}
func (self *section) suicide() {
select {
case <-self.suicideC:
return
default:
}
close(self.suicideC)
}
// removes this section exacly
func (self *section) remove() {
select {
case <-self.offC:
// section is complete, no process
self.unlink()
self.bp.remove(self)
close(self.suicideC)
plog.DebugDetailf("[%s] remove: suicide", sectionhex(self))
case <-self.suicideC:
plog.DebugDetailf("[%s] remove: suicided already", sectionhex(self))
default:
plog.DebugDetailf("[%s] remove: suicide", sectionhex(self))
close(self.suicideC)
}
plog.DebugDetailf("[%s] removed section.", sectionhex(self))
}
// remove a section and all its descendents from the pool
func (self *section) removeInvalidChain() {
// need to get the child before removeSection delinks the section
self.bp.chainLock.RLock()
child := self.child
self.bp.chainLock.RUnlock()
plog.DebugDetailf("[%s] remove invalid chain", sectionhex(self))
self.remove()
if child != nil {
child.removeInvalidChain()
}
}
// unlink a section from its parent/child
func (self *section) unlink() {
// first delink from child and parent under chainlock
self.bp.chainLock.Lock()
link(nil, self)
link(self, nil)
self.bp.chainLock.Unlock()
}

110
blockpool/status.go Normal file

@ -0,0 +1,110 @@
package blockpool
import (
"fmt"
"sync"
)
type statusValues struct {
BlockHashes int // number of hashes fetched this session
BlockHashesInPool int // number of hashes currently in the pool
Blocks int // number of blocks fetched this session
BlocksInPool int // number of blocks currently in the pool
BlocksInChain int // number of blocks inserted/connected to the blockchain this session
NewBlocks int // number of new blocks (received with new blocks msg) this session
Forks int // number of chain forks in the blockchain (poolchain) this session
LongestChain int // the longest chain inserted since the start of session (aka session blockchain height)
BestPeer []byte //Pubkey
Syncing bool // requesting, updating etc
Peers int // cumulative number of all different registered peers since the start of this session
ActivePeers int // cumulative number of all different peers that contributed a hash or block since the start of this session
LivePeers int // number of live peers registered with the block pool (supposed to be redundant but good sanity check
BestPeers int // cumulative number of all peers that at some point were promoted as best peer (peer with highest TD status) this session
BadPeers int // cumulative number of all peers that violated the protocol (invalid block or pow, unrequested hash or block, etc)
}
type status struct {
lock sync.Mutex
values statusValues
chain map[string]int
peers map[string]int
bestPeers map[string]int
badPeers map[string]int
activePeers map[string]int
}
func newStatus() *status {
return &status{
chain: make(map[string]int),
peers: make(map[string]int),
bestPeers: make(map[string]int),
badPeers: make(map[string]int),
activePeers: make(map[string]int),
}
}
type Status struct {
statusValues
}
// blockpool status for reporting
func (self *BlockPool) Status() *Status {
self.status.lock.Lock()
defer self.status.lock.Unlock()
self.status.values.BlockHashesInPool = len(self.pool)
self.status.values.ActivePeers = len(self.status.activePeers)
self.status.values.BestPeers = len(self.status.bestPeers)
self.status.values.BadPeers = len(self.status.badPeers)
self.status.values.LivePeers = len(self.peers.peers)
self.status.values.Peers = len(self.status.peers)
self.status.values.BlockHashesInPool = len(self.pool)
return &Status{self.status.values}
}
func (self *Status) String() string {
return fmt.Sprintf(`
Syncing: %v
BlockHashes: %v
BlockHashesInPool: %v
Blocks: %v
BlocksInPool: %v
BlocksInChain: %v
NewBlocks: %v
Forks: %v
LongestChain: %v
Peers: %v
LivePeers: %v
ActivePeers: %v
BestPeers: %v
BadPeers: %v
`,
self.Syncing,
self.BlockHashes,
self.BlockHashesInPool,
self.Blocks,
self.BlocksInPool,
self.BlocksInChain,
self.NewBlocks,
self.Forks,
self.LongestChain,
self.Peers,
self.LivePeers,
self.ActivePeers,
self.BestPeers,
self.BadPeers,
)
}
func (self *BlockPool) syncing() {
self.status.lock.Lock()
defer self.status.lock.Unlock()
if !self.status.values.Syncing {
self.status.values.Syncing = true
go func() {
self.wg.Wait()
self.status.lock.Lock()
self.status.values.Syncing = false
self.status.lock.Unlock()
}()
}
}

228
blockpool/status_test.go Normal file

@ -0,0 +1,228 @@
package blockpool
import (
"fmt"
"testing"
// "time"
"github.com/ethereum/go-ethereum/blockpool/test"
)
var statusFields = []string{
"BlockHashes",
"BlockHashesInPool",
"Blocks",
"BlocksInPool",
"BlocksInChain",
"NewBlocks",
"Forks",
"LongestChain",
"Peers",
"LivePeers",
"ActivePeers",
"BestPeers",
"BadPeers",
}
func getStatusValues(s *Status) []int {
return []int{
s.BlockHashes,
s.BlockHashesInPool,
s.Blocks,
s.BlocksInPool,
s.BlocksInChain,
s.NewBlocks,
s.Forks,
s.LongestChain,
s.Peers,
s.LivePeers,
s.ActivePeers,
s.BestPeers,
s.BadPeers,
}
}
func checkStatus(t *testing.T, bp *BlockPool, syncing bool, expected []int) (err error) {
s := bp.Status()
if s.Syncing != syncing {
t.Errorf("status for Syncing incorrect. expected %v, got %v", syncing, s.Syncing)
}
got := getStatusValues(s)
for i, v := range expected {
err = test.CheckInt(statusFields[i], got[i], v, t)
if err != nil {
return err
}
fmt.Printf("%v: %v (%v)\n", statusFields[i], got[i], v)
}
return
}
// func TestBlockPoolStatus(t *testing.T) {
// test.LogInit()
// _, blockPool, blockPoolTester := newTestBlockPool(t)
// blockPoolTester.blockChain[0] = nil
// blockPoolTester.initRefBlockChain(12)
// blockPoolTester.refBlockChain[3] = []int{4, 7}
// delete(blockPoolTester.refBlockChain, 6)
// blockPool.Start()
// peer1 := blockPoolTester.newPeer("peer1", 1, 9)
// peer2 := blockPoolTester.newPeer("peer2", 2, 6)
// peer3 := blockPoolTester.newPeer("peer3", 3, 11)
// peer4 := blockPoolTester.newPeer("peer4", 1, 9)
// peer2.blocksRequestsMap = peer1.blocksRequestsMap
// var expected []int
// var err error
// expected = []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
// err = checkStatus(t, blockPool, false, expected)
// if err != nil {
// return
// }
// peer1.AddPeer()
// expected = []int{0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer1.serveBlocks(8, 9)
// expected = []int{0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer1.serveBlockHashes(9, 8, 7, 3, 2)
// expected = []int{5, 5, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer1.serveBlocks(3, 7, 8)
// expected = []int{5, 5, 3, 3, 0, 1, 0, 0, 1, 1, 1, 1, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer1.serveBlocks(2, 3)
// expected = []int{5, 5, 4, 4, 0, 1, 0, 0, 1, 1, 1, 1, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer4.AddPeer()
// expected = []int{5, 5, 4, 4, 0, 2, 0, 0, 2, 2, 1, 1, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer4.sendBlockHashes(12, 11)
// expected = []int{5, 5, 4, 4, 0, 2, 0, 0, 2, 2, 1, 1, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer2.AddPeer()
// expected = []int{5, 5, 4, 4, 0, 3, 0, 0, 3, 3, 1, 2, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer2.serveBlocks(5, 6)
// peer2.serveBlockHashes(6, 5, 4, 3, 2)
// expected = []int{8, 8, 5, 5, 0, 3, 1, 0, 3, 3, 2, 2, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer2.serveBlocks(2, 3, 4)
// expected = []int{8, 8, 6, 6, 0, 3, 1, 0, 3, 3, 2, 2, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// blockPool.RemovePeer("peer2")
// expected = []int{8, 8, 6, 6, 0, 3, 1, 0, 3, 2, 2, 2, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer1.serveBlockHashes(2, 1, 0)
// expected = []int{9, 9, 6, 6, 0, 3, 1, 0, 3, 2, 2, 2, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer1.serveBlocks(1, 2)
// expected = []int{9, 9, 7, 7, 0, 3, 1, 0, 3, 2, 2, 2, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer1.serveBlocks(4, 5)
// expected = []int{9, 9, 8, 8, 0, 3, 1, 0, 3, 2, 2, 2, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer3.AddPeer()
// expected = []int{9, 9, 8, 8, 0, 4, 1, 0, 4, 3, 2, 3, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer3.serveBlocks(10, 11)
// expected = []int{9, 9, 9, 9, 0, 4, 1, 0, 4, 3, 3, 3, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer3.serveBlockHashes(11, 10, 9)
// expected = []int{11, 11, 9, 9, 0, 4, 1, 0, 4, 3, 3, 3, 0}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer4.sendBlocks(11, 12)
// expected = []int{11, 11, 9, 9, 0, 4, 1, 0, 4, 3, 4, 3, 1}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer3.serveBlocks(9, 10)
// expected = []int{11, 11, 10, 10, 0, 4, 1, 0, 4, 3, 4, 3, 1}
// err = checkStatus(t, blockPool, true, expected)
// if err != nil {
// return
// }
// peer3.serveBlocks(0, 1)
// blockPool.Wait(waitTimeout)
// time.Sleep(200 * time.Millisecond)
// expected = []int{11, 3, 11, 3, 8, 4, 1, 8, 4, 3, 4, 3, 1}
// err = checkStatus(t, blockPool, false, expected)
// if err != nil {
// return
// }
// blockPool.Stop()
// }

@ -0,0 +1,57 @@
package test
import (
"sync"
"github.com/ethereum/go-ethereum/crypto"
)
// test helpers
// TODO: move into common test helper package (see p2p/crypto etc.)
func NewHashPool() *TestHashPool {
return &TestHashPool{intToHash: make(intToHash), hashToInt: make(hashToInt)}
}
type intToHash map[int][]byte
type hashToInt map[string]int
// hashPool is a test helper, that allows random hashes to be referred to by integers
type TestHashPool struct {
intToHash
hashToInt
lock sync.Mutex
}
func newHash(i int) []byte {
return crypto.Sha3([]byte(string(i)))
}
func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes [][]byte) {
self.lock.Lock()
defer self.lock.Unlock()
for _, i := range indexes {
hash, found := self.intToHash[i]
if !found {
hash = newHash(i)
self.intToHash[i] = hash
self.hashToInt[string(hash)] = i
}
hashes = append(hashes, hash)
}
return
}
func (self *TestHashPool) HashesToIndexes(hashes [][]byte) (indexes []int) {
self.lock.Lock()
defer self.lock.Unlock()
for _, hash := range hashes {
i, found := self.hashToInt[string(hash)]
if !found {
i = -1
}
indexes = append(indexes, i)
}
return
}

78
blockpool/test/logger.go Normal file

@ -0,0 +1,78 @@
package test
import (
"log"
"os"
"sync"
"testing"
"github.com/ethereum/go-ethereum/logger"
)
var once sync.Once
/* usage:
func TestFunc(t *testing.T) {
test.LogInit()
// test
}
*/
func LogInit() {
once.Do(func() {
var logsys = logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.LogLevel(logger.DebugDetailLevel))
logger.AddLogSystem(logsys)
})
}
type testLogger struct{ t *testing.T }
/* usage:
func TestFunc(t *testing.T) {
defer test.Testlog.Detach()
// test
}
*/
func Testlog(t *testing.T) testLogger {
logger.Reset()
l := testLogger{t}
logger.AddLogSystem(l)
return l
}
func (testLogger) GetLogLevel() logger.LogLevel { return logger.DebugLevel }
func (testLogger) SetLogLevel(logger.LogLevel) {}
func (l testLogger) LogPrint(level logger.LogLevel, msg string) {
l.t.Logf("%s", msg)
}
func (testLogger) Detach() {
logger.Flush()
logger.Reset()
}
type benchLogger struct{ b *testing.B }
/* usage:
func BenchmarkFunc(b *testing.B) {
defer test.Benchlog.Detach()
// test
}
*/
func Benchlog(b *testing.B) benchLogger {
logger.Reset()
l := benchLogger{b}
logger.AddLogSystem(l)
return l
}
func (benchLogger) GetLogLevel() logger.LogLevel { return logger.Silence }
func (benchLogger) SetLogLevel(logger.LogLevel) {}
func (l benchLogger) LogPrint(level logger.LogLevel, msg string) {
l.b.Logf("%s", msg)
}
func (benchLogger) Detach() {
logger.Flush()
logger.Reset()
}

35
blockpool/test/util.go Normal file

@ -0,0 +1,35 @@
package test
import (
"fmt"
"testing"
"time"
)
func CheckInt(name string, got int, expected int, t *testing.T) (err error) {
if got != expected {
t.Errorf("status for %v incorrect. expected %v, got %v", name, expected, got)
err = fmt.Errorf("")
}
return
}
func CheckDuration(name string, got time.Duration, expected time.Duration, t *testing.T) (err error) {
if got != expected {
t.Errorf("status for %v incorrect. expected %v, got %v", name, expected, got)
err = fmt.Errorf("")
}
return
}
func ArrayEq(a, b []int) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}