Merge pull request #1035 from karalabe/eth-threadsafe-peers

eth: make the peer-set thread safe
This commit is contained in:
Jeffrey Wilcke 2015-05-19 08:50:38 -07:00
commit 46d6470c43
3 changed files with 150 additions and 68 deletions

@ -47,9 +47,7 @@ type ProtocolManager struct {
txpool txPool
chainman *core.ChainManager
downloader *downloader.Downloader
pmu sync.Mutex
peers map[string]*peer
peers *peerSet
SubProtocol p2p.Protocol
@ -73,7 +71,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
txpool: txpool,
chainman: chainman,
downloader: downloader,
peers: make(map[string]*peer),
peers: newPeerSet(),
newPeerCh: make(chan *peer, 1),
quitSync: make(chan struct{}),
}
@ -95,10 +93,14 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
}
func (pm *ProtocolManager) removePeer(peer *peer) {
pm.pmu.Lock()
defer pm.pmu.Unlock()
// Unregister the peer from the downloader
pm.downloader.UnregisterPeer(peer.id)
delete(pm.peers, peer.id)
// Remove the peer from the Ethereum peer set too
glog.V(logger.Detail).Infoln("Removing peer", peer.id)
if err := pm.peers.Unregister(peer.id); err != nil {
glog.V(logger.Error).Infoln("Removal failed:", err)
}
}
func (pm *ProtocolManager) Start() {
@ -136,31 +138,32 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter
}
func (pm *ProtocolManager) handle(p *peer) error {
// Execute the Ethereum handshake, short circuit if fails
if err := p.handleStatus(); err != nil {
return err
}
pm.pmu.Lock()
pm.peers[p.id] = p
pm.pmu.Unlock()
pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks)
defer func() {
pm.removePeer(p)
}()
// Register the peer locally and in the downloader too
glog.V(logger.Detail).Infoln("Adding peer", p.id)
if err := pm.peers.Register(p); err != nil {
glog.V(logger.Error).Infoln("Addition failed:", err)
return err
}
defer pm.removePeer(p)
if err := pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks); err != nil {
return err
}
// propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil {
return err
}
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
return err
}
}
return nil
}
@ -346,18 +349,8 @@ func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error {
// out which peers do not contain the block in their block set and will do a
// sqrt(peers) to determine the amount of peers we broadcast to.
func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) {
pm.pmu.Lock()
defer pm.pmu.Unlock()
// Find peers who don't know anything about the given hash. Peers that
// don't know about the hash will be a candidate for the broadcast loop
var peers []*peer
for _, peer := range pm.peers {
if !peer.blockHashes.Has(hash) {
peers = append(peers, peer)
}
}
// Broadcast block to peer set
// Broadcast block to a batch of peers not knowing about it
peers := pm.peers.BlockLackingPeers(hash)
peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.sendNewBlock(block)
@ -369,18 +362,8 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block)
// out which peers do not contain the block in their block set and will do a
// sqrt(peers) to determine the amount of peers we broadcast to.
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
pm.pmu.Lock()
defer pm.pmu.Unlock()
// Find peers who don't know anything about the given hash. Peers that
// don't know about the hash will be a candidate for the broadcast loop
var peers []*peer
for _, peer := range pm.peers {
if !peer.txHashes.Has(hash) {
peers = append(peers, peer)
}
}
// Broadcast block to peer set
// Broadcast transaction to a batch of peers not knowing about it
peers := pm.peers.TxLackingPeers(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.sendTransaction(tx)

@ -1,8 +1,10 @@
package eth
import (
"errors"
"fmt"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@ -12,6 +14,11 @@ import (
"gopkg.in/fatih/set.v0"
)
var (
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
)
type statusMsgData struct {
ProtocolVersion uint32
NetworkId uint32
@ -25,16 +32,6 @@ type getBlockHashesMsgData struct {
Amount uint64
}
func getBestPeer(peers map[string]*peer) *peer {
var peer *peer
for _, cp := range peers {
if peer == nil || cp.td.Cmp(peer.td) > 0 {
peer = cp
}
}
return peer
}
type peer struct {
*p2p.Peer
@ -159,3 +156,103 @@ func (p *peer) handleStatus() error {
return <-errc
}
// peerSet represents the collection of active peers currently participating in
// the Ethereum sub-protocol.
type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
}
// newPeerSet creates a new peer set to track the active participants.
func newPeerSet() *peerSet {
return &peerSet{
peers: make(map[string]*peer),
}
}
// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock()
defer ps.lock.Unlock()
if _, ok := ps.peers[p.id]; ok {
return errAlreadyRegistered
}
ps.peers[p.id] = p
return nil
}
// Unregister removes a remote peer from the active set, disabling any further
// actions to/from that particular entity.
func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock()
defer ps.lock.Unlock()
if _, ok := ps.peers[id]; !ok {
return errNotRegistered
}
delete(ps.peers, id)
return nil
}
// Peer retrieves the registered peer with the given id.
func (ps *peerSet) Peer(id string) *peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
return ps.peers[id]
}
// Len returns if the current number of peers in the set.
func (ps *peerSet) Len() int {
ps.lock.RLock()
defer ps.lock.RUnlock()
return len(ps.peers)
}
// BlockLackingPeers retrieves a list of peers that do not have a given block
// in their set of known hashes.
func (ps *peerSet) BlockLackingPeers(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.blockHashes.Has(hash) {
list = append(list, p)
}
}
return list
}
// TxLackingPeers retrieves a list of peers that do not have a given transaction
// in their set of known hashes.
func (ps *peerSet) TxLackingPeers(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.txHashes.Has(hash) {
list = append(list, p)
}
}
return list
}
// BestPeer retrieves the known peer with the currently highest total difficulty.
func (ps *peerSet) BestPeer() *peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
var best *peer
for _, p := range ps.peers {
if best == nil || p.td.Cmp(best.td) > 0 {
best = p
}
}
return best
}

@ -10,8 +10,8 @@ import (
"github.com/ethereum/go-ethereum/logger/glog"
)
// Sync contains all synchronisation code for the eth protocol
// update periodically tries to synchronise with the network, both downloading
// hashes and blocks as well as retrieving cached ones.
func (pm *ProtocolManager) update() {
forceSync := time.Tick(forceSyncCycle)
blockProc := time.Tick(blockProcCycle)
@ -20,22 +20,16 @@ func (pm *ProtocolManager) update() {
for {
select {
case <-pm.newPeerCh:
// Meet the `minDesiredPeerCount` before we select our best peer
if len(pm.peers) < minDesiredPeerCount {
// Make sure we have peers to select from, then sync
if pm.peers.Len() < minDesiredPeerCount {
break
}
// Find the best peer and synchronise with it
peer := getBestPeer(pm.peers)
if peer == nil {
glog.V(logger.Debug).Infoln("Sync attempt canceled. No peers available")
}
go pm.synchronise(peer)
go pm.synchronise(pm.peers.BestPeer())
case <-forceSync:
// Force a sync even if not enough peers are present
if peer := getBestPeer(pm.peers); peer != nil {
go pm.synchronise(peer)
}
go pm.synchronise(pm.peers.BestPeer())
case <-blockProc:
// Try to pull some blocks from the downloaded
if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) {
@ -51,10 +45,9 @@ func (pm *ProtocolManager) update() {
}
}
// processBlocks will attempt to reconstruct a chain by checking the first item and check if it's
// a known parent. The first block in the chain may be unknown during downloading. When the
// downloader isn't downloading blocks will be dropped with an unknown parent until either it
// has depleted the list or found a known parent.
// processBlocks retrieves downloaded blocks from the download cache and tries
// to construct the local block chain with it. Note, since the block retrieval
// order matters, access to this function *must* be synchronized/serialized.
func (pm *ProtocolManager) processBlocks() error {
pm.wg.Add(1)
defer pm.wg.Done()
@ -79,15 +72,24 @@ func (pm *ProtocolManager) processBlocks() error {
return nil
}
// synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (pm *ProtocolManager) synchronise(peer *peer) {
// Short circuit if no peers are available
if peer == nil {
glog.V(logger.Debug).Infoln("Synchronisation canceled: no peers available")
return
}
// Make sure the peer's TD is higher than our own. If not drop.
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
glog.V(logger.Debug).Infoln("Synchronisation canceled: peer TD too small")
return
}
// FIXME if we have the hash in our chain and the TD of the peer is
// much higher than ours, something is wrong with us or the peer.
// Check if the hash is on our own chain
if pm.chainman.HasBlock(peer.recentHash) {
glog.V(logger.Debug).Infoln("Synchronisation canceled: head already known")
return
}
// Get the hashes from the peer (synchronously)