eth: propagate blocks and transactions async

This commit is contained in:
Péter Szilágyi 2018-05-21 11:32:42 +03:00
parent ab6bdbd9b0
commit d9cee2c172
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D
2 changed files with 117 additions and 12 deletions

@ -698,7 +698,7 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// Send the block to a subset of our peers // Send the block to a subset of our peers
transfer := peers[:int(math.Sqrt(float64(len(peers))))] transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer { for _, peer := range transfer {
peer.SendNewBlock(block, td) peer.AsyncSendNewBlock(block, td)
} }
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return return
@ -706,7 +706,7 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// Otherwise if the block is indeed in out own chain, announce it // Otherwise if the block is indeed in out own chain, announce it
if pm.blockchain.HasBlock(hash, block.NumberU64()) { if pm.blockchain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers { for _, peer := range peers {
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) peer.AsyncSendNewBlockHash(block)
} }
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
} }
@ -727,7 +727,7 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
} }
// FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for peer, txs := range txset { for peer, txs := range txset {
peer.SendTransactions(txs) peer.AsyncSendTransactions(txs)
} }
} }

@ -39,6 +39,22 @@ var (
const ( const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
// contain a single transaction, or thousands.
maxQueuedTxs = 128
// maxQueuedProps is the maximum number of block propagations to queue up before
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
// that might cover uncles should be enough.
maxQueuedProps = 4
// maxQueuedAnns is the maximum number of block announcements to queue up before
// dropping broadcasts. Similarly to block propagations, there's no point to queue
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4
handshakeTimeout = 5 * time.Second handshakeTimeout = 5 * time.Second
) )
@ -50,6 +66,12 @@ type PeerInfo struct {
Head string `json:"head"` // SHA3 hash of the peer's best owned block Head string `json:"head"` // SHA3 hash of the peer's best owned block
} }
// propEvent is a block propagation, waiting for its turn in the broadcast queue.
type propEvent struct {
block *types.Block
td *big.Int
}
type peer struct { type peer struct {
id string id string
@ -65,21 +87,62 @@ type peer struct {
knownTxs *set.Set // Set of transaction hashes known to be known by this peer knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer knownBlocks *set.Set // Set of block hashes known to be known by this peer
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
term chan struct{} // Termination channel to stop the broadcaster
} }
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
id := p.ID()
return &peer{ return &peer{
Peer: p, Peer: p,
rw: rw, rw: rw,
version: version, version: version,
id: fmt.Sprintf("%x", id[:8]), id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
knownTxs: set.New(), knownTxs: set.New(),
knownBlocks: set.New(), knownBlocks: set.New(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
term: make(chan struct{}),
} }
} }
// broadcast is a write loop that multiplexes block propagations, announcements
// and transaction broadcasts into the remote peer. The goal is to have an async
// writer that does not lock up node internals.
func (p *peer) broadcast() {
for {
select {
case txs := <-p.queuedTxs:
if err := p.SendTransactions(txs); err != nil {
return
}
p.Log().Trace("Broadcast transactions", "count", len(txs))
case prop := <-p.queuedProps:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
}
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
case block := <-p.queuedAnns:
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
return
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
case <-p.term:
return
}
}
}
// close signals the broadcast goroutine to terminate.
func (p *peer) close() {
close(p.term)
}
// Info gathers and returns a collection of metadata known about a peer. // Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *PeerInfo { func (p *peer) Info() *PeerInfo {
hash, td := p.Head() hash, td := p.Head()
@ -139,6 +202,19 @@ func (p *peer) SendTransactions(txs types.Transactions) error {
return p2p.Send(p.rw, TxMsg, txs) return p2p.Send(p.rw, TxMsg, txs)
} }
// AsyncSendTransactions queues list of transactions propagation to a remote
// peer. If the peer's broadcast queue is full, the event is silently dropped.
func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
select {
case p.queuedTxs <- txs:
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
default:
p.Log().Debug("Dropping transaction propagation", "count", len(txs))
}
}
// SendNewBlockHashes announces the availability of a number of blocks through // SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification. // a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
@ -153,12 +229,35 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
return p2p.Send(p.rw, NewBlockHashesMsg, request) return p2p.Send(p.rw, NewBlockHashesMsg, request)
} }
// AsyncSendNewBlockHash queues the availability of a block for propagation to a
// remote peer. If the peer's broadcast queue is full, the event is silently
// dropped.
func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
select {
case p.queuedAnns <- block:
p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
}
}
// SendNewBlock propagates an entire block to a remote peer. // SendNewBlock propagates an entire block to a remote peer.
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
p.knownBlocks.Add(block.Hash()) p.knownBlocks.Add(block.Hash())
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
} }
// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
// the peer's broadcast queue is full, the event is silently dropped.
func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
select {
case p.queuedProps <- &propEvent{block: block, td: td}:
p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
}
}
// SendBlockHeaders sends a batch of block headers to the remote peer. // SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(headers []*types.Header) error { func (p *peer) SendBlockHeaders(headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, headers) return p2p.Send(p.rw, BlockHeadersMsg, headers)
@ -313,7 +412,8 @@ func newPeerSet() *peerSet {
} }
// Register injects a new peer into the working set, or returns an error if the // Register injects a new peer into the working set, or returns an error if the
// peer is already known. // peer is already known. If a new peer it registered, its broadcast loop is also
// started.
func (ps *peerSet) Register(p *peer) error { func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock() ps.lock.Lock()
defer ps.lock.Unlock() defer ps.lock.Unlock()
@ -325,6 +425,8 @@ func (ps *peerSet) Register(p *peer) error {
return errAlreadyRegistered return errAlreadyRegistered
} }
ps.peers[p.id] = p ps.peers[p.id] = p
go p.broadcast()
return nil return nil
} }
@ -334,10 +436,13 @@ func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock() ps.lock.Lock()
defer ps.lock.Unlock() defer ps.lock.Unlock()
if _, ok := ps.peers[id]; !ok { p, ok := ps.peers[id]
if !ok {
return errNotRegistered return errNotRegistered
} }
delete(ps.peers, id) delete(ps.peers, id)
p.close()
return nil return nil
} }