eth: limit number of sent transactions based on message size
Nodes that are out of sync will queue many transactions, which causes the initial transactions message to grow very large. Larger transactions messages can make communication impossible if the message is too big to send. Big transactions messages also exhaust egress bandwidth, which degrades other peer connections. The new approach to combat these issues is to send transactions in smaller batches. This commit introduces a new goroutine that handles delivery of all initial transaction transfers. Size-limited packs of transactions are sent to one peer at a time, conserving precious egress bandwidth.
This commit is contained in:
parent
41b2008a66
commit
6c73a59806
@ -53,9 +53,11 @@ type ProtocolManager struct {
|
|||||||
txSub event.Subscription
|
txSub event.Subscription
|
||||||
minedBlockSub event.Subscription
|
minedBlockSub event.Subscription
|
||||||
|
|
||||||
|
// channels for fetcher, syncer, txsyncLoop
|
||||||
newPeerCh chan *peer
|
newPeerCh chan *peer
|
||||||
newHashCh chan []*blockAnnounce
|
newHashCh chan []*blockAnnounce
|
||||||
newBlockCh chan chan []*types.Block
|
newBlockCh chan chan []*types.Block
|
||||||
|
txsyncCh chan *txsync
|
||||||
quitSync chan struct{}
|
quitSync chan struct{}
|
||||||
|
|
||||||
// wait group is used for graceful shutdowns during downloading
|
// wait group is used for graceful shutdowns during downloading
|
||||||
@ -76,9 +78,9 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
|
|||||||
newPeerCh: make(chan *peer, 1),
|
newPeerCh: make(chan *peer, 1),
|
||||||
newHashCh: make(chan []*blockAnnounce, 1),
|
newHashCh: make(chan []*blockAnnounce, 1),
|
||||||
newBlockCh: make(chan chan []*types.Block),
|
newBlockCh: make(chan chan []*types.Block),
|
||||||
|
txsyncCh: make(chan *txsync),
|
||||||
quitSync: make(chan struct{}),
|
quitSync: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
manager.SubProtocol = p2p.Protocol{
|
manager.SubProtocol = p2p.Protocol{
|
||||||
Name: "eth",
|
Name: "eth",
|
||||||
Version: uint(protocolVersion),
|
Version: uint(protocolVersion),
|
||||||
@ -118,13 +120,14 @@ func (pm *ProtocolManager) Start() {
|
|||||||
// broadcast transactions
|
// broadcast transactions
|
||||||
pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
|
pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
|
||||||
go pm.txBroadcastLoop()
|
go pm.txBroadcastLoop()
|
||||||
|
|
||||||
// broadcast mined blocks
|
// broadcast mined blocks
|
||||||
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
|
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
|
||||||
go pm.minedBroadcastLoop()
|
go pm.minedBroadcastLoop()
|
||||||
|
|
||||||
|
// start sync handlers
|
||||||
go pm.syncer()
|
go pm.syncer()
|
||||||
go pm.fetcher()
|
go pm.fetcher()
|
||||||
|
go pm.txsyncLoop()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProtocolManager) Stop() {
|
func (pm *ProtocolManager) Stop() {
|
||||||
@ -135,7 +138,7 @@ func (pm *ProtocolManager) Stop() {
|
|||||||
pm.quit = true
|
pm.quit = true
|
||||||
pm.txSub.Unsubscribe() // quits txBroadcastLoop
|
pm.txSub.Unsubscribe() // quits txBroadcastLoop
|
||||||
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
|
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
|
||||||
close(pm.quitSync) // quits the sync handler
|
close(pm.quitSync) // quits syncer, fetcher, txsyncLoop
|
||||||
|
|
||||||
// Wait for any process action
|
// Wait for any process action
|
||||||
pm.wg.Wait()
|
pm.wg.Wait()
|
||||||
@ -150,11 +153,12 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProtocolManager) handle(p *peer) error {
|
func (pm *ProtocolManager) handle(p *peer) error {
|
||||||
// Execute the Ethereum handshake, short circuit if fails
|
// Execute the Ethereum handshake.
|
||||||
if err := p.handleStatus(); err != nil {
|
if err := p.handleStatus(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Register the peer locally and in the downloader too
|
|
||||||
|
// Register the peer locally.
|
||||||
glog.V(logger.Detail).Infoln("Adding peer", p.id)
|
glog.V(logger.Detail).Infoln("Adding peer", p.id)
|
||||||
if err := pm.peers.Register(p); err != nil {
|
if err := pm.peers.Register(p); err != nil {
|
||||||
glog.V(logger.Error).Infoln("Addition failed:", err)
|
glog.V(logger.Error).Infoln("Addition failed:", err)
|
||||||
@ -162,14 +166,16 @@ func (pm *ProtocolManager) handle(p *peer) error {
|
|||||||
}
|
}
|
||||||
defer pm.removePeer(p.id)
|
defer pm.removePeer(p.id)
|
||||||
|
|
||||||
|
// Register the peer in the downloader. If the downloader
|
||||||
|
// considers it banned, we disconnect.
|
||||||
if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil {
|
if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// propagate existing transactions. new transactions appearing
|
|
||||||
|
// Propagate existing transactions. new transactions appearing
|
||||||
// after this will be sent via broadcasts.
|
// after this will be sent via broadcasts.
|
||||||
if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil {
|
pm.syncTransactions(p)
|
||||||
return err
|
|
||||||
}
|
|
||||||
// main loop. handle incoming messages.
|
// main loop. handle incoming messages.
|
||||||
for {
|
for {
|
||||||
if err := pm.handleMsg(p); err != nil {
|
if err := pm.handleMsg(p); err != nil {
|
||||||
|
94
eth/sync.go
94
eth/sync.go
@ -2,6 +2,7 @@ package eth
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
|
"math/rand"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -10,6 +11,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/eth/downloader"
|
"github.com/ethereum/go-ethereum/eth/downloader"
|
||||||
"github.com/ethereum/go-ethereum/logger"
|
"github.com/ethereum/go-ethereum/logger"
|
||||||
"github.com/ethereum/go-ethereum/logger/glog"
|
"github.com/ethereum/go-ethereum/logger/glog"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -20,6 +22,10 @@ const (
|
|||||||
notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
|
notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
|
||||||
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
||||||
blockProcAmount = 256
|
blockProcAmount = 256
|
||||||
|
|
||||||
|
// This is the target size for the packs of transactions sent by txsyncLoop.
|
||||||
|
// A pack can get larger than this if a single transactions exceeds this size.
|
||||||
|
txsyncPackSize = 100 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
// blockAnnounce is the hash notification of the availability of a new block in
|
// blockAnnounce is the hash notification of the availability of a new block in
|
||||||
@ -30,6 +36,94 @@ type blockAnnounce struct {
|
|||||||
time time.Time
|
time time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type txsync struct {
|
||||||
|
p *peer
|
||||||
|
txs []*types.Transaction
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncTransactions starts sending all currently pending transactions to the given peer.
|
||||||
|
func (pm *ProtocolManager) syncTransactions(p *peer) {
|
||||||
|
txs := pm.txpool.GetTransactions()
|
||||||
|
if len(txs) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case pm.txsyncCh <- &txsync{p, txs}:
|
||||||
|
case <-pm.quitSync:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// txsyncLoop takes care of the initial transaction sync for each new
|
||||||
|
// connection. When a new peer appears, we relay all currently pending
|
||||||
|
// transactions. In order to minimise egress bandwidth usage, we send
|
||||||
|
// the transactions in small packs to one peer at a time.
|
||||||
|
func (pm *ProtocolManager) txsyncLoop() {
|
||||||
|
var (
|
||||||
|
pending = make(map[discover.NodeID]*txsync)
|
||||||
|
sending = false // whether a send is active
|
||||||
|
pack = new(txsync) // the pack that is being sent
|
||||||
|
done = make(chan error, 1) // result of the send
|
||||||
|
)
|
||||||
|
|
||||||
|
// send starts a sending a pack of transactions from the sync.
|
||||||
|
send := func(s *txsync) {
|
||||||
|
// Fill pack with transactions up to the target size.
|
||||||
|
size := common.StorageSize(0)
|
||||||
|
pack.p = s.p
|
||||||
|
pack.txs = pack.txs[:0]
|
||||||
|
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
|
||||||
|
pack.txs = append(pack.txs, s.txs[i])
|
||||||
|
size += s.txs[i].Size()
|
||||||
|
}
|
||||||
|
// Remove the transactions that will be sent.
|
||||||
|
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
|
||||||
|
if len(s.txs) == 0 {
|
||||||
|
delete(pending, s.p.ID())
|
||||||
|
}
|
||||||
|
// Send the pack in the background.
|
||||||
|
glog.V(logger.Detail).Infof("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size)
|
||||||
|
sending = true
|
||||||
|
go func() { done <- pack.p.sendTransactions(pack.txs) }()
|
||||||
|
}
|
||||||
|
|
||||||
|
// pick chooses the next pending sync.
|
||||||
|
pick := func() *txsync {
|
||||||
|
if len(pending) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
n := rand.Intn(len(pending)) + 1
|
||||||
|
for _, s := range pending {
|
||||||
|
if n--; n == 0 {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case s := <-pm.txsyncCh:
|
||||||
|
pending[s.p.ID()] = s
|
||||||
|
if !sending {
|
||||||
|
send(s)
|
||||||
|
}
|
||||||
|
case err := <-done:
|
||||||
|
sending = false
|
||||||
|
// Stop tracking peers that cause send failures.
|
||||||
|
if err != nil {
|
||||||
|
glog.V(logger.Debug).Infof("%v: tx send failed: %v", pack.p.Peer, err)
|
||||||
|
delete(pending, pack.p.ID())
|
||||||
|
}
|
||||||
|
// Schedule the next send.
|
||||||
|
if s := pick(); s != nil {
|
||||||
|
send(s)
|
||||||
|
}
|
||||||
|
case <-pm.quitSync:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// fetcher is responsible for collecting hash notifications, and periodically
|
// fetcher is responsible for collecting hash notifications, and periodically
|
||||||
// checking all unknown ones and individually fetching them.
|
// checking all unknown ones and individually fetching them.
|
||||||
func (pm *ProtocolManager) fetcher() {
|
func (pm *ProtocolManager) fetcher() {
|
||||||
|
Loading…
Reference in New Issue
Block a user