eth, p2p/msgrate: move peer QoS tracking to its own package and use it for snap (#22876)
This change extracts the peer QoS tracking logic from eth/downloader, moving it into the new package p2p/msgrate. The job of msgrate.Tracker is determining suitable timeout values and request sizes per peer. The snap sync scheduler now uses msgrate.Tracker instead of the hard-coded 15s timeout. This should make the sync work better on network links with high latency.
This commit is contained in:
parent
b3a1fda650
commit
3e795881ea
@ -47,16 +47,6 @@ var (
|
|||||||
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
|
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
|
||||||
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
|
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
|
||||||
|
|
||||||
rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
|
|
||||||
rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests
|
|
||||||
rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
|
|
||||||
ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
|
|
||||||
ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
|
|
||||||
|
|
||||||
qosTuningPeers = 5 // Number of peers to tune based on (best peers)
|
|
||||||
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
|
|
||||||
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
|
|
||||||
|
|
||||||
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
|
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
|
||||||
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
|
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
|
||||||
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
|
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
|
||||||
@ -96,13 +86,6 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Downloader struct {
|
type Downloader struct {
|
||||||
// WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically.
|
|
||||||
// On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
|
|
||||||
// guaranteed to be so aligned, so take advantage of that. For more information,
|
|
||||||
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
|
|
||||||
rttEstimate uint64 // Round trip time to target for download requests
|
|
||||||
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
|
|
||||||
|
|
||||||
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
|
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
|
||||||
mux *event.TypeMux // Event multiplexer to announce sync operation events
|
mux *event.TypeMux // Event multiplexer to announce sync operation events
|
||||||
|
|
||||||
@ -232,8 +215,6 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
|
|||||||
checkpoint: checkpoint,
|
checkpoint: checkpoint,
|
||||||
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
|
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
|
||||||
peers: newPeerSet(),
|
peers: newPeerSet(),
|
||||||
rttEstimate: uint64(rttMaxEstimate),
|
|
||||||
rttConfidence: uint64(1000000),
|
|
||||||
blockchain: chain,
|
blockchain: chain,
|
||||||
lightchain: lightchain,
|
lightchain: lightchain,
|
||||||
dropPeer: dropPeer,
|
dropPeer: dropPeer,
|
||||||
@ -252,7 +233,6 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
|
|||||||
},
|
},
|
||||||
trackStateReq: make(chan *stateReq),
|
trackStateReq: make(chan *stateReq),
|
||||||
}
|
}
|
||||||
go dl.qosTuner()
|
|
||||||
go dl.stateFetcher()
|
go dl.stateFetcher()
|
||||||
return dl
|
return dl
|
||||||
}
|
}
|
||||||
@ -310,8 +290,6 @@ func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error {
|
|||||||
logger.Error("Failed to register sync peer", "err", err)
|
logger.Error("Failed to register sync peer", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
d.qosReduceConfidence()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -670,7 +648,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
|
|||||||
}
|
}
|
||||||
go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)
|
go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)
|
||||||
|
|
||||||
ttl := d.requestTTL()
|
ttl := d.peers.rates.TargetTimeout()
|
||||||
timeout := time.After(ttl)
|
timeout := time.After(ttl)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -853,7 +831,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re
|
|||||||
// Wait for the remote response to the head fetch
|
// Wait for the remote response to the head fetch
|
||||||
number, hash := uint64(0), common.Hash{}
|
number, hash := uint64(0), common.Hash{}
|
||||||
|
|
||||||
ttl := d.requestTTL()
|
ttl := d.peers.rates.TargetTimeout()
|
||||||
timeout := time.After(ttl)
|
timeout := time.After(ttl)
|
||||||
|
|
||||||
for finished := false; !finished; {
|
for finished := false; !finished; {
|
||||||
@ -942,7 +920,7 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode,
|
|||||||
// Split our chain interval in two, and request the hash to cross check
|
// Split our chain interval in two, and request the hash to cross check
|
||||||
check := (start + end) / 2
|
check := (start + end) / 2
|
||||||
|
|
||||||
ttl := d.requestTTL()
|
ttl := d.peers.rates.TargetTimeout()
|
||||||
timeout := time.After(ttl)
|
timeout := time.After(ttl)
|
||||||
|
|
||||||
go p.peer.RequestHeadersByNumber(check, 1, 0, false)
|
go p.peer.RequestHeadersByNumber(check, 1, 0, false)
|
||||||
@ -1035,7 +1013,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
|
|||||||
getHeaders := func(from uint64) {
|
getHeaders := func(from uint64) {
|
||||||
request = time.Now()
|
request = time.Now()
|
||||||
|
|
||||||
ttl = d.requestTTL()
|
ttl = d.peers.rates.TargetTimeout()
|
||||||
timeout.Reset(ttl)
|
timeout.Reset(ttl)
|
||||||
|
|
||||||
if skeleton {
|
if skeleton {
|
||||||
@ -1050,7 +1028,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
|
|||||||
pivoting = true
|
pivoting = true
|
||||||
request = time.Now()
|
request = time.Now()
|
||||||
|
|
||||||
ttl = d.requestTTL()
|
ttl = d.peers.rates.TargetTimeout()
|
||||||
timeout.Reset(ttl)
|
timeout.Reset(ttl)
|
||||||
|
|
||||||
d.pivotLock.RLock()
|
d.pivotLock.RLock()
|
||||||
@ -1262,12 +1240,12 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
|
|||||||
pack := packet.(*headerPack)
|
pack := packet.(*headerPack)
|
||||||
return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
|
return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
|
||||||
}
|
}
|
||||||
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
|
expire = func() map[string]int { return d.queue.ExpireHeaders(d.peers.rates.TargetTimeout()) }
|
||||||
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
|
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
|
||||||
return d.queue.ReserveHeaders(p, count), false, false
|
return d.queue.ReserveHeaders(p, count), false, false
|
||||||
}
|
}
|
||||||
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
|
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
|
||||||
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
|
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.peers.rates.TargetRoundTrip()) }
|
||||||
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
|
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
|
||||||
p.SetHeadersIdle(accepted, deliveryTime)
|
p.SetHeadersIdle(accepted, deliveryTime)
|
||||||
}
|
}
|
||||||
@ -1293,9 +1271,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
|
|||||||
pack := packet.(*bodyPack)
|
pack := packet.(*bodyPack)
|
||||||
return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
|
return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
|
||||||
}
|
}
|
||||||
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
|
expire = func() map[string]int { return d.queue.ExpireBodies(d.peers.rates.TargetTimeout()) }
|
||||||
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
|
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
|
||||||
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
|
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.peers.rates.TargetRoundTrip()) }
|
||||||
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
|
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
|
||||||
)
|
)
|
||||||
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
|
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
|
||||||
@ -1317,9 +1295,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
|
|||||||
pack := packet.(*receiptPack)
|
pack := packet.(*receiptPack)
|
||||||
return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
|
return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
|
||||||
}
|
}
|
||||||
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
|
expire = func() map[string]int { return d.queue.ExpireReceipts(d.peers.rates.TargetTimeout()) }
|
||||||
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
|
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
|
||||||
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
|
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.peers.rates.TargetRoundTrip()) }
|
||||||
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
|
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
|
||||||
p.SetReceiptsIdle(accepted, deliveryTime)
|
p.SetReceiptsIdle(accepted, deliveryTime)
|
||||||
}
|
}
|
||||||
@ -2031,78 +2009,3 @@ func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dro
|
|||||||
return errNoSyncActive
|
return errNoSyncActive
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// qosTuner is the quality of service tuning loop that occasionally gathers the
|
|
||||||
// peer latency statistics and updates the estimated request round trip time.
|
|
||||||
func (d *Downloader) qosTuner() {
|
|
||||||
for {
|
|
||||||
// Retrieve the current median RTT and integrate into the previoust target RTT
|
|
||||||
rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
|
|
||||||
atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
|
|
||||||
|
|
||||||
// A new RTT cycle passed, increase our confidence in the estimated RTT
|
|
||||||
conf := atomic.LoadUint64(&d.rttConfidence)
|
|
||||||
conf = conf + (1000000-conf)/2
|
|
||||||
atomic.StoreUint64(&d.rttConfidence, conf)
|
|
||||||
|
|
||||||
// Log the new QoS values and sleep until the next RTT
|
|
||||||
log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
|
|
||||||
select {
|
|
||||||
case <-d.quitCh:
|
|
||||||
return
|
|
||||||
case <-time.After(rtt):
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// qosReduceConfidence is meant to be called when a new peer joins the downloader's
|
|
||||||
// peer set, needing to reduce the confidence we have in out QoS estimates.
|
|
||||||
func (d *Downloader) qosReduceConfidence() {
|
|
||||||
// If we have a single peer, confidence is always 1
|
|
||||||
peers := uint64(d.peers.Len())
|
|
||||||
if peers == 0 {
|
|
||||||
// Ensure peer connectivity races don't catch us off guard
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if peers == 1 {
|
|
||||||
atomic.StoreUint64(&d.rttConfidence, 1000000)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// If we have a ton of peers, don't drop confidence)
|
|
||||||
if peers >= uint64(qosConfidenceCap) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Otherwise drop the confidence factor
|
|
||||||
conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
|
|
||||||
if float64(conf)/1000000 < rttMinConfidence {
|
|
||||||
conf = uint64(rttMinConfidence * 1000000)
|
|
||||||
}
|
|
||||||
atomic.StoreUint64(&d.rttConfidence, conf)
|
|
||||||
|
|
||||||
rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
|
|
||||||
log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
|
|
||||||
}
|
|
||||||
|
|
||||||
// requestRTT returns the current target round trip time for a download request
|
|
||||||
// to complete in.
|
|
||||||
//
|
|
||||||
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
|
|
||||||
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
|
|
||||||
// be adapted to, but smaller ones are preferred (stabler download stream).
|
|
||||||
func (d *Downloader) requestRTT() time.Duration {
|
|
||||||
return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
|
|
||||||
}
|
|
||||||
|
|
||||||
// requestTTL returns the current timeout allowance for a single download request
|
|
||||||
// to finish under.
|
|
||||||
func (d *Downloader) requestTTL() time.Duration {
|
|
||||||
var (
|
|
||||||
rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
|
|
||||||
conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
|
|
||||||
)
|
|
||||||
ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
|
|
||||||
if ttl > ttlLimit {
|
|
||||||
ttl = ttlLimit
|
|
||||||
}
|
|
||||||
return ttl
|
|
||||||
}
|
|
||||||
|
@ -32,11 +32,11 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/eth/protocols/eth"
|
"github.com/ethereum/go-ethereum/eth/protocols/eth"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/msgrate"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
|
maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
|
||||||
measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -54,18 +54,12 @@ type peerConnection struct {
|
|||||||
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
|
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
|
||||||
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
|
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
|
||||||
|
|
||||||
headerThroughput float64 // Number of headers measured to be retrievable per second
|
|
||||||
blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second
|
|
||||||
receiptThroughput float64 // Number of receipts measured to be retrievable per second
|
|
||||||
stateThroughput float64 // Number of node data pieces measured to be retrievable per second
|
|
||||||
|
|
||||||
rtt time.Duration // Request round trip time to track responsiveness (QoS)
|
|
||||||
|
|
||||||
headerStarted time.Time // Time instance when the last header fetch was started
|
headerStarted time.Time // Time instance when the last header fetch was started
|
||||||
blockStarted time.Time // Time instance when the last block (body) fetch was started
|
blockStarted time.Time // Time instance when the last block (body) fetch was started
|
||||||
receiptStarted time.Time // Time instance when the last receipt fetch was started
|
receiptStarted time.Time // Time instance when the last receipt fetch was started
|
||||||
stateStarted time.Time // Time instance when the last node data fetch was started
|
stateStarted time.Time // Time instance when the last node data fetch was started
|
||||||
|
|
||||||
|
rates *msgrate.Tracker // Tracker to hone in on the number of items retrievable per second
|
||||||
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
|
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
|
||||||
|
|
||||||
peer Peer
|
peer Peer
|
||||||
@ -133,11 +127,6 @@ func (p *peerConnection) Reset() {
|
|||||||
atomic.StoreInt32(&p.receiptIdle, 0)
|
atomic.StoreInt32(&p.receiptIdle, 0)
|
||||||
atomic.StoreInt32(&p.stateIdle, 0)
|
atomic.StoreInt32(&p.stateIdle, 0)
|
||||||
|
|
||||||
p.headerThroughput = 0
|
|
||||||
p.blockThroughput = 0
|
|
||||||
p.receiptThroughput = 0
|
|
||||||
p.stateThroughput = 0
|
|
||||||
|
|
||||||
p.lacking = make(map[common.Hash]struct{})
|
p.lacking = make(map[common.Hash]struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -212,93 +201,72 @@ func (p *peerConnection) FetchNodeData(hashes []common.Hash) error {
|
|||||||
// requests. Its estimated header retrieval throughput is updated with that measured
|
// requests. Its estimated header retrieval throughput is updated with that measured
|
||||||
// just now.
|
// just now.
|
||||||
func (p *peerConnection) SetHeadersIdle(delivered int, deliveryTime time.Time) {
|
func (p *peerConnection) SetHeadersIdle(delivered int, deliveryTime time.Time) {
|
||||||
p.setIdle(deliveryTime.Sub(p.headerStarted), delivered, &p.headerThroughput, &p.headerIdle)
|
p.rates.Update(eth.BlockHeadersMsg, deliveryTime.Sub(p.headerStarted), delivered)
|
||||||
|
atomic.StoreInt32(&p.headerIdle, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
|
// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
|
||||||
// requests. Its estimated body retrieval throughput is updated with that measured
|
// requests. Its estimated body retrieval throughput is updated with that measured
|
||||||
// just now.
|
// just now.
|
||||||
func (p *peerConnection) SetBodiesIdle(delivered int, deliveryTime time.Time) {
|
func (p *peerConnection) SetBodiesIdle(delivered int, deliveryTime time.Time) {
|
||||||
p.setIdle(deliveryTime.Sub(p.blockStarted), delivered, &p.blockThroughput, &p.blockIdle)
|
p.rates.Update(eth.BlockBodiesMsg, deliveryTime.Sub(p.blockStarted), delivered)
|
||||||
|
atomic.StoreInt32(&p.blockIdle, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
|
// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
|
||||||
// retrieval requests. Its estimated receipt retrieval throughput is updated
|
// retrieval requests. Its estimated receipt retrieval throughput is updated
|
||||||
// with that measured just now.
|
// with that measured just now.
|
||||||
func (p *peerConnection) SetReceiptsIdle(delivered int, deliveryTime time.Time) {
|
func (p *peerConnection) SetReceiptsIdle(delivered int, deliveryTime time.Time) {
|
||||||
p.setIdle(deliveryTime.Sub(p.receiptStarted), delivered, &p.receiptThroughput, &p.receiptIdle)
|
p.rates.Update(eth.ReceiptsMsg, deliveryTime.Sub(p.receiptStarted), delivered)
|
||||||
|
atomic.StoreInt32(&p.receiptIdle, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
|
// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
|
||||||
// data retrieval requests. Its estimated state retrieval throughput is updated
|
// data retrieval requests. Its estimated state retrieval throughput is updated
|
||||||
// with that measured just now.
|
// with that measured just now.
|
||||||
func (p *peerConnection) SetNodeDataIdle(delivered int, deliveryTime time.Time) {
|
func (p *peerConnection) SetNodeDataIdle(delivered int, deliveryTime time.Time) {
|
||||||
p.setIdle(deliveryTime.Sub(p.stateStarted), delivered, &p.stateThroughput, &p.stateIdle)
|
p.rates.Update(eth.NodeDataMsg, deliveryTime.Sub(p.stateStarted), delivered)
|
||||||
}
|
atomic.StoreInt32(&p.stateIdle, 0)
|
||||||
|
|
||||||
// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
|
|
||||||
// Its estimated retrieval throughput is updated with that measured just now.
|
|
||||||
func (p *peerConnection) setIdle(elapsed time.Duration, delivered int, throughput *float64, idle *int32) {
|
|
||||||
// Irrelevant of the scaling, make sure the peer ends up idle
|
|
||||||
defer atomic.StoreInt32(idle, 0)
|
|
||||||
|
|
||||||
p.lock.Lock()
|
|
||||||
defer p.lock.Unlock()
|
|
||||||
|
|
||||||
// If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum
|
|
||||||
if delivered == 0 {
|
|
||||||
*throughput = 0
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Otherwise update the throughput with a new measurement
|
|
||||||
if elapsed <= 0 {
|
|
||||||
elapsed = 1 // +1 (ns) to ensure non-zero divisor
|
|
||||||
}
|
|
||||||
measured := float64(delivered) / (float64(elapsed) / float64(time.Second))
|
|
||||||
|
|
||||||
*throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
|
|
||||||
p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed))
|
|
||||||
|
|
||||||
p.log.Trace("Peer throughput measurements updated",
|
|
||||||
"hps", p.headerThroughput, "bps", p.blockThroughput,
|
|
||||||
"rps", p.receiptThroughput, "sps", p.stateThroughput,
|
|
||||||
"miss", len(p.lacking), "rtt", p.rtt)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeaderCapacity retrieves the peers header download allowance based on its
|
// HeaderCapacity retrieves the peers header download allowance based on its
|
||||||
// previously discovered throughput.
|
// previously discovered throughput.
|
||||||
func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
|
func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
|
||||||
p.lock.RLock()
|
cap := int(math.Ceil(p.rates.Capacity(eth.BlockHeadersMsg, targetRTT)))
|
||||||
defer p.lock.RUnlock()
|
if cap > MaxHeaderFetch {
|
||||||
|
cap = MaxHeaderFetch
|
||||||
return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch)))
|
}
|
||||||
|
return cap
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlockCapacity retrieves the peers block download allowance based on its
|
// BlockCapacity retrieves the peers block download allowance based on its
|
||||||
// previously discovered throughput.
|
// previously discovered throughput.
|
||||||
func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
|
func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
|
||||||
p.lock.RLock()
|
cap := int(math.Ceil(p.rates.Capacity(eth.BlockBodiesMsg, targetRTT)))
|
||||||
defer p.lock.RUnlock()
|
if cap > MaxBlockFetch {
|
||||||
|
cap = MaxBlockFetch
|
||||||
return int(math.Min(1+math.Max(1, p.blockThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxBlockFetch)))
|
}
|
||||||
|
return cap
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReceiptCapacity retrieves the peers receipt download allowance based on its
|
// ReceiptCapacity retrieves the peers receipt download allowance based on its
|
||||||
// previously discovered throughput.
|
// previously discovered throughput.
|
||||||
func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
|
func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
|
||||||
p.lock.RLock()
|
cap := int(math.Ceil(p.rates.Capacity(eth.ReceiptsMsg, targetRTT)))
|
||||||
defer p.lock.RUnlock()
|
if cap > MaxReceiptFetch {
|
||||||
|
cap = MaxReceiptFetch
|
||||||
return int(math.Min(1+math.Max(1, p.receiptThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxReceiptFetch)))
|
}
|
||||||
|
return cap
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeDataCapacity retrieves the peers state download allowance based on its
|
// NodeDataCapacity retrieves the peers state download allowance based on its
|
||||||
// previously discovered throughput.
|
// previously discovered throughput.
|
||||||
func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int {
|
func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int {
|
||||||
p.lock.RLock()
|
cap := int(math.Ceil(p.rates.Capacity(eth.NodeDataMsg, targetRTT)))
|
||||||
defer p.lock.RUnlock()
|
if cap > MaxStateFetch {
|
||||||
|
cap = MaxStateFetch
|
||||||
return int(math.Min(1+math.Max(1, p.stateThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxStateFetch)))
|
}
|
||||||
|
return cap
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
|
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
|
||||||
@ -331,8 +299,11 @@ func (p *peerConnection) Lacks(hash common.Hash) bool {
|
|||||||
// download procedure.
|
// download procedure.
|
||||||
type peerSet struct {
|
type peerSet struct {
|
||||||
peers map[string]*peerConnection
|
peers map[string]*peerConnection
|
||||||
|
rates *msgrate.Trackers // Set of rate trackers to give the sync a common beat
|
||||||
|
|
||||||
newPeerFeed event.Feed
|
newPeerFeed event.Feed
|
||||||
peerDropFeed event.Feed
|
peerDropFeed event.Feed
|
||||||
|
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -340,6 +311,7 @@ type peerSet struct {
|
|||||||
func newPeerSet() *peerSet {
|
func newPeerSet() *peerSet {
|
||||||
return &peerSet{
|
return &peerSet{
|
||||||
peers: make(map[string]*peerConnection),
|
peers: make(map[string]*peerConnection),
|
||||||
|
rates: msgrate.NewTrackers(log.New("proto", "eth")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -371,30 +343,15 @@ func (ps *peerSet) Reset() {
|
|||||||
// average of all existing peers, to give it a realistic chance of being used
|
// average of all existing peers, to give it a realistic chance of being used
|
||||||
// for data retrievals.
|
// for data retrievals.
|
||||||
func (ps *peerSet) Register(p *peerConnection) error {
|
func (ps *peerSet) Register(p *peerConnection) error {
|
||||||
// Retrieve the current median RTT as a sane default
|
|
||||||
p.rtt = ps.medianRTT()
|
|
||||||
|
|
||||||
// Register the new peer with some meaningful defaults
|
// Register the new peer with some meaningful defaults
|
||||||
ps.lock.Lock()
|
ps.lock.Lock()
|
||||||
if _, ok := ps.peers[p.id]; ok {
|
if _, ok := ps.peers[p.id]; ok {
|
||||||
ps.lock.Unlock()
|
ps.lock.Unlock()
|
||||||
return errAlreadyRegistered
|
return errAlreadyRegistered
|
||||||
}
|
}
|
||||||
if len(ps.peers) > 0 {
|
p.rates = msgrate.NewTracker(ps.rates.MeanCapacities(), ps.rates.MedianRoundTrip())
|
||||||
p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0
|
if err := ps.rates.Track(p.id, p.rates); err != nil {
|
||||||
|
return err
|
||||||
for _, peer := range ps.peers {
|
|
||||||
peer.lock.RLock()
|
|
||||||
p.headerThroughput += peer.headerThroughput
|
|
||||||
p.blockThroughput += peer.blockThroughput
|
|
||||||
p.receiptThroughput += peer.receiptThroughput
|
|
||||||
p.stateThroughput += peer.stateThroughput
|
|
||||||
peer.lock.RUnlock()
|
|
||||||
}
|
|
||||||
p.headerThroughput /= float64(len(ps.peers))
|
|
||||||
p.blockThroughput /= float64(len(ps.peers))
|
|
||||||
p.receiptThroughput /= float64(len(ps.peers))
|
|
||||||
p.stateThroughput /= float64(len(ps.peers))
|
|
||||||
}
|
}
|
||||||
ps.peers[p.id] = p
|
ps.peers[p.id] = p
|
||||||
ps.lock.Unlock()
|
ps.lock.Unlock()
|
||||||
@ -413,6 +370,7 @@ func (ps *peerSet) Unregister(id string) error {
|
|||||||
return errNotRegistered
|
return errNotRegistered
|
||||||
}
|
}
|
||||||
delete(ps.peers, id)
|
delete(ps.peers, id)
|
||||||
|
ps.rates.Untrack(id)
|
||||||
ps.lock.Unlock()
|
ps.lock.Unlock()
|
||||||
|
|
||||||
ps.peerDropFeed.Send(p)
|
ps.peerDropFeed.Send(p)
|
||||||
@ -454,9 +412,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
|
|||||||
return atomic.LoadInt32(&p.headerIdle) == 0
|
return atomic.LoadInt32(&p.headerIdle) == 0
|
||||||
}
|
}
|
||||||
throughput := func(p *peerConnection) float64 {
|
throughput := func(p *peerConnection) float64 {
|
||||||
p.lock.RLock()
|
return p.rates.Capacity(eth.BlockHeadersMsg, time.Second)
|
||||||
defer p.lock.RUnlock()
|
|
||||||
return p.headerThroughput
|
|
||||||
}
|
}
|
||||||
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
|
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
|
||||||
}
|
}
|
||||||
@ -468,9 +424,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
|
|||||||
return atomic.LoadInt32(&p.blockIdle) == 0
|
return atomic.LoadInt32(&p.blockIdle) == 0
|
||||||
}
|
}
|
||||||
throughput := func(p *peerConnection) float64 {
|
throughput := func(p *peerConnection) float64 {
|
||||||
p.lock.RLock()
|
return p.rates.Capacity(eth.BlockBodiesMsg, time.Second)
|
||||||
defer p.lock.RUnlock()
|
|
||||||
return p.blockThroughput
|
|
||||||
}
|
}
|
||||||
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
|
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
|
||||||
}
|
}
|
||||||
@ -482,9 +436,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
|
|||||||
return atomic.LoadInt32(&p.receiptIdle) == 0
|
return atomic.LoadInt32(&p.receiptIdle) == 0
|
||||||
}
|
}
|
||||||
throughput := func(p *peerConnection) float64 {
|
throughput := func(p *peerConnection) float64 {
|
||||||
p.lock.RLock()
|
return p.rates.Capacity(eth.ReceiptsMsg, time.Second)
|
||||||
defer p.lock.RUnlock()
|
|
||||||
return p.receiptThroughput
|
|
||||||
}
|
}
|
||||||
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
|
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
|
||||||
}
|
}
|
||||||
@ -496,9 +448,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
|
|||||||
return atomic.LoadInt32(&p.stateIdle) == 0
|
return atomic.LoadInt32(&p.stateIdle) == 0
|
||||||
}
|
}
|
||||||
throughput := func(p *peerConnection) float64 {
|
throughput := func(p *peerConnection) float64 {
|
||||||
p.lock.RLock()
|
return p.rates.Capacity(eth.NodeDataMsg, time.Second)
|
||||||
defer p.lock.RUnlock()
|
|
||||||
return p.stateThroughput
|
|
||||||
}
|
}
|
||||||
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
|
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
|
||||||
}
|
}
|
||||||
@ -527,37 +477,6 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol uint, idleCheck func(*peer
|
|||||||
return sortPeers.p, total
|
return sortPeers.p, total
|
||||||
}
|
}
|
||||||
|
|
||||||
// medianRTT returns the median RTT of the peerset, considering only the tuning
|
|
||||||
// peers if there are more peers available.
|
|
||||||
func (ps *peerSet) medianRTT() time.Duration {
|
|
||||||
// Gather all the currently measured round trip times
|
|
||||||
ps.lock.RLock()
|
|
||||||
defer ps.lock.RUnlock()
|
|
||||||
|
|
||||||
rtts := make([]float64, 0, len(ps.peers))
|
|
||||||
for _, p := range ps.peers {
|
|
||||||
p.lock.RLock()
|
|
||||||
rtts = append(rtts, float64(p.rtt))
|
|
||||||
p.lock.RUnlock()
|
|
||||||
}
|
|
||||||
sort.Float64s(rtts)
|
|
||||||
|
|
||||||
median := rttMaxEstimate
|
|
||||||
if qosTuningPeers <= len(rtts) {
|
|
||||||
median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers
|
|
||||||
} else if len(rtts) > 0 {
|
|
||||||
median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos)
|
|
||||||
}
|
|
||||||
// Restrict the RTT into some QoS defaults, irrelevant of true RTT
|
|
||||||
if median < rttMinEstimate {
|
|
||||||
median = rttMinEstimate
|
|
||||||
}
|
|
||||||
if median > rttMaxEstimate {
|
|
||||||
median = rttMaxEstimate
|
|
||||||
}
|
|
||||||
return median
|
|
||||||
}
|
|
||||||
|
|
||||||
// peerThroughputSort implements the Sort interface, and allows for
|
// peerThroughputSort implements the Sort interface, and allows for
|
||||||
// sorting a set of peers by their throughput
|
// sorting a set of peers by their throughput
|
||||||
// The sorted data is with the _highest_ throughput first
|
// The sorted data is with the _highest_ throughput first
|
||||||
|
@ -1,53 +0,0 @@
|
|||||||
// Copyright 2020 The go-ethereum Authors
|
|
||||||
// This file is part of go-ethereum.
|
|
||||||
//
|
|
||||||
// go-ethereum is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// go-ethereum is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package downloader
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sort"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestPeerThroughputSorting(t *testing.T) {
|
|
||||||
a := &peerConnection{
|
|
||||||
id: "a",
|
|
||||||
headerThroughput: 1.25,
|
|
||||||
}
|
|
||||||
b := &peerConnection{
|
|
||||||
id: "b",
|
|
||||||
headerThroughput: 1.21,
|
|
||||||
}
|
|
||||||
c := &peerConnection{
|
|
||||||
id: "c",
|
|
||||||
headerThroughput: 1.23,
|
|
||||||
}
|
|
||||||
|
|
||||||
peers := []*peerConnection{a, b, c}
|
|
||||||
tps := []float64{a.headerThroughput,
|
|
||||||
b.headerThroughput, c.headerThroughput}
|
|
||||||
sortPeers := &peerThroughputSort{peers, tps}
|
|
||||||
sort.Sort(sortPeers)
|
|
||||||
if got, exp := sortPeers.p[0].id, "a"; got != exp {
|
|
||||||
t.Errorf("sort fail, got %v exp %v", got, exp)
|
|
||||||
}
|
|
||||||
if got, exp := sortPeers.p[1].id, "c"; got != exp {
|
|
||||||
t.Errorf("sort fail, got %v exp %v", got, exp)
|
|
||||||
}
|
|
||||||
if got, exp := sortPeers.p[2].id, "b"; got != exp {
|
|
||||||
t.Errorf("sort fail, got %v exp %v", got, exp)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -433,8 +433,8 @@ func (s *stateSync) assignTasks() {
|
|||||||
peers, _ := s.d.peers.NodeDataIdlePeers()
|
peers, _ := s.d.peers.NodeDataIdlePeers()
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
// Assign a batch of fetches proportional to the estimated latency/bandwidth
|
// Assign a batch of fetches proportional to the estimated latency/bandwidth
|
||||||
cap := p.NodeDataCapacity(s.d.requestRTT())
|
cap := p.NodeDataCapacity(s.d.peers.rates.TargetRoundTrip())
|
||||||
req := &stateReq{peer: p, timeout: s.d.requestTTL()}
|
req := &stateReq{peer: p, timeout: s.d.peers.rates.TargetTimeout()}
|
||||||
|
|
||||||
nodes, _, codes := s.fillTasks(cap, req)
|
nodes, _, codes := s.fillTasks(cap, req)
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/light"
|
"github.com/ethereum/go-ethereum/light"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/msgrate"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
"golang.org/x/crypto/sha3"
|
"golang.org/x/crypto/sha3"
|
||||||
@ -51,14 +52,15 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// maxRequestSize is the maximum number of bytes to request from a remote peer.
|
// minRequestSize is the minimum number of bytes to request from a remote peer.
|
||||||
maxRequestSize = 128 * 1024
|
// This number is used as the low cap for account and storage range requests.
|
||||||
|
// Bytecode and trienode are limited inherently by item count (1).
|
||||||
|
minRequestSize = 64 * 1024
|
||||||
|
|
||||||
// maxStorageSetRequestCount is the maximum number of contracts to request the
|
// maxRequestSize is the maximum number of bytes to request from a remote peer.
|
||||||
// storage of in a single query. If this number is too low, we're not filling
|
// This number is used as the high cap for account and storage range requests.
|
||||||
// responses fully and waste round trip times. If it's too high, we're capping
|
// Bytecode and trienode are limited more explicitly by the caps below.
|
||||||
// responses and waste bandwidth.
|
maxRequestSize = 512 * 1024
|
||||||
maxStorageSetRequestCount = maxRequestSize / 1024
|
|
||||||
|
|
||||||
// maxCodeRequestCount is the maximum number of bytecode blobs to request in a
|
// maxCodeRequestCount is the maximum number of bytecode blobs to request in a
|
||||||
// single query. If this number is too low, we're not filling responses fully
|
// single query. If this number is too low, we're not filling responses fully
|
||||||
@ -74,7 +76,7 @@ const (
|
|||||||
// a single query. If this number is too low, we're not filling responses fully
|
// a single query. If this number is too low, we're not filling responses fully
|
||||||
// and waste round trip times. If it's too high, we're capping responses and
|
// and waste round trip times. If it's too high, we're capping responses and
|
||||||
// waste bandwidth.
|
// waste bandwidth.
|
||||||
maxTrieRequestCount = 256
|
maxTrieRequestCount = maxRequestSize / 512
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -85,10 +87,6 @@ var (
|
|||||||
// storageConcurrency is the number of chunks to split the a large contract
|
// storageConcurrency is the number of chunks to split the a large contract
|
||||||
// storage trie into to allow concurrent retrievals.
|
// storage trie into to allow concurrent retrievals.
|
||||||
storageConcurrency = 16
|
storageConcurrency = 16
|
||||||
|
|
||||||
// requestTimeout is the maximum time a peer is allowed to spend on serving
|
|
||||||
// a single network request.
|
|
||||||
requestTimeout = 15 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync?
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrCancelled is returned from snap syncing if the operation was prematurely
|
// ErrCancelled is returned from snap syncing if the operation was prematurely
|
||||||
@ -107,6 +105,7 @@ var ErrCancelled = errors.New("sync cancelled")
|
|||||||
type accountRequest struct {
|
type accountRequest struct {
|
||||||
peer string // Peer to which this request is assigned
|
peer string // Peer to which this request is assigned
|
||||||
id uint64 // Request ID of this request
|
id uint64 // Request ID of this request
|
||||||
|
time time.Time // Timestamp when the request was sent
|
||||||
|
|
||||||
deliver chan *accountResponse // Channel to deliver successful response on
|
deliver chan *accountResponse // Channel to deliver successful response on
|
||||||
revert chan *accountRequest // Channel to deliver request failure on
|
revert chan *accountRequest // Channel to deliver request failure on
|
||||||
@ -144,6 +143,7 @@ type accountResponse struct {
|
|||||||
type bytecodeRequest struct {
|
type bytecodeRequest struct {
|
||||||
peer string // Peer to which this request is assigned
|
peer string // Peer to which this request is assigned
|
||||||
id uint64 // Request ID of this request
|
id uint64 // Request ID of this request
|
||||||
|
time time.Time // Timestamp when the request was sent
|
||||||
|
|
||||||
deliver chan *bytecodeResponse // Channel to deliver successful response on
|
deliver chan *bytecodeResponse // Channel to deliver successful response on
|
||||||
revert chan *bytecodeRequest // Channel to deliver request failure on
|
revert chan *bytecodeRequest // Channel to deliver request failure on
|
||||||
@ -175,6 +175,7 @@ type bytecodeResponse struct {
|
|||||||
type storageRequest struct {
|
type storageRequest struct {
|
||||||
peer string // Peer to which this request is assigned
|
peer string // Peer to which this request is assigned
|
||||||
id uint64 // Request ID of this request
|
id uint64 // Request ID of this request
|
||||||
|
time time.Time // Timestamp when the request was sent
|
||||||
|
|
||||||
deliver chan *storageResponse // Channel to deliver successful response on
|
deliver chan *storageResponse // Channel to deliver successful response on
|
||||||
revert chan *storageRequest // Channel to deliver request failure on
|
revert chan *storageRequest // Channel to deliver request failure on
|
||||||
@ -220,6 +221,7 @@ type storageResponse struct {
|
|||||||
type trienodeHealRequest struct {
|
type trienodeHealRequest struct {
|
||||||
peer string // Peer to which this request is assigned
|
peer string // Peer to which this request is assigned
|
||||||
id uint64 // Request ID of this request
|
id uint64 // Request ID of this request
|
||||||
|
time time.Time // Timestamp when the request was sent
|
||||||
|
|
||||||
deliver chan *trienodeHealResponse // Channel to deliver successful response on
|
deliver chan *trienodeHealResponse // Channel to deliver successful response on
|
||||||
revert chan *trienodeHealRequest // Channel to deliver request failure on
|
revert chan *trienodeHealRequest // Channel to deliver request failure on
|
||||||
@ -254,6 +256,7 @@ type trienodeHealResponse struct {
|
|||||||
type bytecodeHealRequest struct {
|
type bytecodeHealRequest struct {
|
||||||
peer string // Peer to which this request is assigned
|
peer string // Peer to which this request is assigned
|
||||||
id uint64 // Request ID of this request
|
id uint64 // Request ID of this request
|
||||||
|
time time.Time // Timestamp when the request was sent
|
||||||
|
|
||||||
deliver chan *bytecodeHealResponse // Channel to deliver successful response on
|
deliver chan *bytecodeHealResponse // Channel to deliver successful response on
|
||||||
revert chan *bytecodeHealRequest // Channel to deliver request failure on
|
revert chan *bytecodeHealRequest // Channel to deliver request failure on
|
||||||
@ -396,6 +399,7 @@ type Syncer struct {
|
|||||||
peers map[string]SyncPeer // Currently active peers to download from
|
peers map[string]SyncPeer // Currently active peers to download from
|
||||||
peerJoin *event.Feed // Event feed to react to peers joining
|
peerJoin *event.Feed // Event feed to react to peers joining
|
||||||
peerDrop *event.Feed // Event feed to react to peers dropping
|
peerDrop *event.Feed // Event feed to react to peers dropping
|
||||||
|
rates *msgrate.Trackers // Message throughput rates for peers
|
||||||
|
|
||||||
// Request tracking during syncing phase
|
// Request tracking during syncing phase
|
||||||
statelessPeers map[string]struct{} // Peers that failed to deliver state data
|
statelessPeers map[string]struct{} // Peers that failed to deliver state data
|
||||||
@ -452,6 +456,7 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
|
|||||||
peers: make(map[string]SyncPeer),
|
peers: make(map[string]SyncPeer),
|
||||||
peerJoin: new(event.Feed),
|
peerJoin: new(event.Feed),
|
||||||
peerDrop: new(event.Feed),
|
peerDrop: new(event.Feed),
|
||||||
|
rates: msgrate.NewTrackers(log.New("proto", "snap")),
|
||||||
update: make(chan struct{}, 1),
|
update: make(chan struct{}, 1),
|
||||||
|
|
||||||
accountIdlers: make(map[string]struct{}),
|
accountIdlers: make(map[string]struct{}),
|
||||||
@ -484,6 +489,7 @@ func (s *Syncer) Register(peer SyncPeer) error {
|
|||||||
return errors.New("already registered")
|
return errors.New("already registered")
|
||||||
}
|
}
|
||||||
s.peers[id] = peer
|
s.peers[id] = peer
|
||||||
|
s.rates.Track(id, msgrate.NewTracker(s.rates.MeanCapacities(), s.rates.MedianRoundTrip()))
|
||||||
|
|
||||||
// Mark the peer as idle, even if no sync is running
|
// Mark the peer as idle, even if no sync is running
|
||||||
s.accountIdlers[id] = struct{}{}
|
s.accountIdlers[id] = struct{}{}
|
||||||
@ -509,6 +515,7 @@ func (s *Syncer) Unregister(id string) error {
|
|||||||
return errors.New("not registered")
|
return errors.New("not registered")
|
||||||
}
|
}
|
||||||
delete(s.peers, id)
|
delete(s.peers, id)
|
||||||
|
s.rates.Untrack(id)
|
||||||
|
|
||||||
// Remove status markers, even if no sync is running
|
// Remove status markers, even if no sync is running
|
||||||
delete(s.statelessPeers, id)
|
delete(s.statelessPeers, id)
|
||||||
@ -851,10 +858,24 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
|
|||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
// If there are no idle peers, short circuit assignment
|
// Sort the peers by download capacity to use faster ones if many available
|
||||||
if len(s.accountIdlers) == 0 {
|
idlers := &capacitySort{
|
||||||
|
ids: make([]string, 0, len(s.accountIdlers)),
|
||||||
|
caps: make([]float64, 0, len(s.accountIdlers)),
|
||||||
|
}
|
||||||
|
targetTTL := s.rates.TargetTimeout()
|
||||||
|
for id := range s.accountIdlers {
|
||||||
|
if _, ok := s.statelessPeers[id]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
idlers.ids = append(idlers.ids, id)
|
||||||
|
idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL))
|
||||||
|
}
|
||||||
|
if len(idlers.ids) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
sort.Sort(sort.Reverse(idlers))
|
||||||
|
|
||||||
// Iterate over all the tasks and try to find a pending one
|
// Iterate over all the tasks and try to find a pending one
|
||||||
for _, task := range s.tasks {
|
for _, task := range s.tasks {
|
||||||
// Skip any tasks already filling
|
// Skip any tasks already filling
|
||||||
@ -864,20 +885,15 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
|
|||||||
// Task pending retrieval, try to find an idle peer. If no such peer
|
// Task pending retrieval, try to find an idle peer. If no such peer
|
||||||
// exists, we probably assigned tasks for all (or they are stateless).
|
// exists, we probably assigned tasks for all (or they are stateless).
|
||||||
// Abort the entire assignment mechanism.
|
// Abort the entire assignment mechanism.
|
||||||
var idle string
|
if len(idlers.ids) == 0 {
|
||||||
for id := range s.accountIdlers {
|
|
||||||
// If the peer rejected a query in this sync cycle, don't bother asking
|
|
||||||
// again for anything, it's either out of sync or already pruned
|
|
||||||
if _, ok := s.statelessPeers[id]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
idle = id
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if idle == "" {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
peer := s.peers[idle]
|
var (
|
||||||
|
idle = idlers.ids[0]
|
||||||
|
peer = s.peers[idle]
|
||||||
|
cap = idlers.caps[0]
|
||||||
|
)
|
||||||
|
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
|
||||||
|
|
||||||
// Matched a pending task to an idle peer, allocate a unique request id
|
// Matched a pending task to an idle peer, allocate a unique request id
|
||||||
var reqid uint64
|
var reqid uint64
|
||||||
@ -895,6 +911,7 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
|
|||||||
req := &accountRequest{
|
req := &accountRequest{
|
||||||
peer: idle,
|
peer: idle,
|
||||||
id: reqid,
|
id: reqid,
|
||||||
|
time: time.Now(),
|
||||||
deliver: success,
|
deliver: success,
|
||||||
revert: fail,
|
revert: fail,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@ -903,8 +920,9 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
|
|||||||
limit: task.Last,
|
limit: task.Last,
|
||||||
task: task,
|
task: task,
|
||||||
}
|
}
|
||||||
req.timeout = time.AfterFunc(requestTimeout, func() {
|
req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
|
||||||
peer.Log().Debug("Account range request timed out", "reqid", reqid)
|
peer.Log().Debug("Account range request timed out", "reqid", reqid)
|
||||||
|
s.rates.Update(idle, AccountRangeMsg, 0, 0)
|
||||||
s.scheduleRevertAccountRequest(req)
|
s.scheduleRevertAccountRequest(req)
|
||||||
})
|
})
|
||||||
s.accountReqs[reqid] = req
|
s.accountReqs[reqid] = req
|
||||||
@ -915,7 +933,13 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
|
|||||||
defer s.pend.Done()
|
defer s.pend.Done()
|
||||||
|
|
||||||
// Attempt to send the remote request and revert if it fails
|
// Attempt to send the remote request and revert if it fails
|
||||||
if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, maxRequestSize); err != nil {
|
if cap > maxRequestSize {
|
||||||
|
cap = maxRequestSize
|
||||||
|
}
|
||||||
|
if cap < minRequestSize { // Don't bother with peers below a bare minimum performance
|
||||||
|
cap = minRequestSize
|
||||||
|
}
|
||||||
|
if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, uint64(cap)); err != nil {
|
||||||
peer.Log().Debug("Failed to request account range", "err", err)
|
peer.Log().Debug("Failed to request account range", "err", err)
|
||||||
s.scheduleRevertAccountRequest(req)
|
s.scheduleRevertAccountRequest(req)
|
||||||
}
|
}
|
||||||
@ -931,10 +955,24 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
|
|||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
// If there are no idle peers, short circuit assignment
|
// Sort the peers by download capacity to use faster ones if many available
|
||||||
if len(s.bytecodeIdlers) == 0 {
|
idlers := &capacitySort{
|
||||||
|
ids: make([]string, 0, len(s.bytecodeIdlers)),
|
||||||
|
caps: make([]float64, 0, len(s.bytecodeIdlers)),
|
||||||
|
}
|
||||||
|
targetTTL := s.rates.TargetTimeout()
|
||||||
|
for id := range s.bytecodeIdlers {
|
||||||
|
if _, ok := s.statelessPeers[id]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
idlers.ids = append(idlers.ids, id)
|
||||||
|
idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL))
|
||||||
|
}
|
||||||
|
if len(idlers.ids) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
sort.Sort(sort.Reverse(idlers))
|
||||||
|
|
||||||
// Iterate over all the tasks and try to find a pending one
|
// Iterate over all the tasks and try to find a pending one
|
||||||
for _, task := range s.tasks {
|
for _, task := range s.tasks {
|
||||||
// Skip any tasks not in the bytecode retrieval phase
|
// Skip any tasks not in the bytecode retrieval phase
|
||||||
@ -948,20 +986,15 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
|
|||||||
// Task pending retrieval, try to find an idle peer. If no such peer
|
// Task pending retrieval, try to find an idle peer. If no such peer
|
||||||
// exists, we probably assigned tasks for all (or they are stateless).
|
// exists, we probably assigned tasks for all (or they are stateless).
|
||||||
// Abort the entire assignment mechanism.
|
// Abort the entire assignment mechanism.
|
||||||
var idle string
|
if len(idlers.ids) == 0 {
|
||||||
for id := range s.bytecodeIdlers {
|
|
||||||
// If the peer rejected a query in this sync cycle, don't bother asking
|
|
||||||
// again for anything, it's either out of sync or already pruned
|
|
||||||
if _, ok := s.statelessPeers[id]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
idle = id
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if idle == "" {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
peer := s.peers[idle]
|
var (
|
||||||
|
idle = idlers.ids[0]
|
||||||
|
peer = s.peers[idle]
|
||||||
|
cap = idlers.caps[0]
|
||||||
|
)
|
||||||
|
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
|
||||||
|
|
||||||
// Matched a pending task to an idle peer, allocate a unique request id
|
// Matched a pending task to an idle peer, allocate a unique request id
|
||||||
var reqid uint64
|
var reqid uint64
|
||||||
@ -976,17 +1009,21 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Generate the network query and send it to the peer
|
// Generate the network query and send it to the peer
|
||||||
hashes := make([]common.Hash, 0, maxCodeRequestCount)
|
if cap > maxCodeRequestCount {
|
||||||
|
cap = maxCodeRequestCount
|
||||||
|
}
|
||||||
|
hashes := make([]common.Hash, 0, int(cap))
|
||||||
for hash := range task.codeTasks {
|
for hash := range task.codeTasks {
|
||||||
delete(task.codeTasks, hash)
|
delete(task.codeTasks, hash)
|
||||||
hashes = append(hashes, hash)
|
hashes = append(hashes, hash)
|
||||||
if len(hashes) >= maxCodeRequestCount {
|
if len(hashes) >= int(cap) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
req := &bytecodeRequest{
|
req := &bytecodeRequest{
|
||||||
peer: idle,
|
peer: idle,
|
||||||
id: reqid,
|
id: reqid,
|
||||||
|
time: time.Now(),
|
||||||
deliver: success,
|
deliver: success,
|
||||||
revert: fail,
|
revert: fail,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@ -994,8 +1031,9 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
|
|||||||
hashes: hashes,
|
hashes: hashes,
|
||||||
task: task,
|
task: task,
|
||||||
}
|
}
|
||||||
req.timeout = time.AfterFunc(requestTimeout, func() {
|
req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
|
||||||
peer.Log().Debug("Bytecode request timed out", "reqid", reqid)
|
peer.Log().Debug("Bytecode request timed out", "reqid", reqid)
|
||||||
|
s.rates.Update(idle, ByteCodesMsg, 0, 0)
|
||||||
s.scheduleRevertBytecodeRequest(req)
|
s.scheduleRevertBytecodeRequest(req)
|
||||||
})
|
})
|
||||||
s.bytecodeReqs[reqid] = req
|
s.bytecodeReqs[reqid] = req
|
||||||
@ -1020,10 +1058,24 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
|
|||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
// If there are no idle peers, short circuit assignment
|
// Sort the peers by download capacity to use faster ones if many available
|
||||||
if len(s.storageIdlers) == 0 {
|
idlers := &capacitySort{
|
||||||
|
ids: make([]string, 0, len(s.storageIdlers)),
|
||||||
|
caps: make([]float64, 0, len(s.storageIdlers)),
|
||||||
|
}
|
||||||
|
targetTTL := s.rates.TargetTimeout()
|
||||||
|
for id := range s.storageIdlers {
|
||||||
|
if _, ok := s.statelessPeers[id]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
idlers.ids = append(idlers.ids, id)
|
||||||
|
idlers.caps = append(idlers.caps, s.rates.Capacity(id, StorageRangesMsg, targetTTL))
|
||||||
|
}
|
||||||
|
if len(idlers.ids) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
sort.Sort(sort.Reverse(idlers))
|
||||||
|
|
||||||
// Iterate over all the tasks and try to find a pending one
|
// Iterate over all the tasks and try to find a pending one
|
||||||
for _, task := range s.tasks {
|
for _, task := range s.tasks {
|
||||||
// Skip any tasks not in the storage retrieval phase
|
// Skip any tasks not in the storage retrieval phase
|
||||||
@ -1037,20 +1089,15 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
|
|||||||
// Task pending retrieval, try to find an idle peer. If no such peer
|
// Task pending retrieval, try to find an idle peer. If no such peer
|
||||||
// exists, we probably assigned tasks for all (or they are stateless).
|
// exists, we probably assigned tasks for all (or they are stateless).
|
||||||
// Abort the entire assignment mechanism.
|
// Abort the entire assignment mechanism.
|
||||||
var idle string
|
if len(idlers.ids) == 0 {
|
||||||
for id := range s.storageIdlers {
|
|
||||||
// If the peer rejected a query in this sync cycle, don't bother asking
|
|
||||||
// again for anything, it's either out of sync or already pruned
|
|
||||||
if _, ok := s.statelessPeers[id]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
idle = id
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if idle == "" {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
peer := s.peers[idle]
|
var (
|
||||||
|
idle = idlers.ids[0]
|
||||||
|
peer = s.peers[idle]
|
||||||
|
cap = idlers.caps[0]
|
||||||
|
)
|
||||||
|
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
|
||||||
|
|
||||||
// Matched a pending task to an idle peer, allocate a unique request id
|
// Matched a pending task to an idle peer, allocate a unique request id
|
||||||
var reqid uint64
|
var reqid uint64
|
||||||
@ -1067,9 +1114,17 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
|
|||||||
// Generate the network query and send it to the peer. If there are
|
// Generate the network query and send it to the peer. If there are
|
||||||
// large contract tasks pending, complete those before diving into
|
// large contract tasks pending, complete those before diving into
|
||||||
// even more new contracts.
|
// even more new contracts.
|
||||||
|
if cap > maxRequestSize {
|
||||||
|
cap = maxRequestSize
|
||||||
|
}
|
||||||
|
if cap < minRequestSize { // Don't bother with peers below a bare minimum performance
|
||||||
|
cap = minRequestSize
|
||||||
|
}
|
||||||
|
storageSets := int(cap / 1024)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
accounts = make([]common.Hash, 0, maxStorageSetRequestCount)
|
accounts = make([]common.Hash, 0, storageSets)
|
||||||
roots = make([]common.Hash, 0, maxStorageSetRequestCount)
|
roots = make([]common.Hash, 0, storageSets)
|
||||||
subtask *storageTask
|
subtask *storageTask
|
||||||
)
|
)
|
||||||
for account, subtasks := range task.SubTasks {
|
for account, subtasks := range task.SubTasks {
|
||||||
@ -1096,7 +1151,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
|
|||||||
accounts = append(accounts, acccount)
|
accounts = append(accounts, acccount)
|
||||||
roots = append(roots, root)
|
roots = append(roots, root)
|
||||||
|
|
||||||
if len(accounts) >= maxStorageSetRequestCount {
|
if len(accounts) >= storageSets {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1109,6 +1164,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
|
|||||||
req := &storageRequest{
|
req := &storageRequest{
|
||||||
peer: idle,
|
peer: idle,
|
||||||
id: reqid,
|
id: reqid,
|
||||||
|
time: time.Now(),
|
||||||
deliver: success,
|
deliver: success,
|
||||||
revert: fail,
|
revert: fail,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@ -1122,8 +1178,9 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
|
|||||||
req.origin = subtask.Next
|
req.origin = subtask.Next
|
||||||
req.limit = subtask.Last
|
req.limit = subtask.Last
|
||||||
}
|
}
|
||||||
req.timeout = time.AfterFunc(requestTimeout, func() {
|
req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
|
||||||
peer.Log().Debug("Storage request timed out", "reqid", reqid)
|
peer.Log().Debug("Storage request timed out", "reqid", reqid)
|
||||||
|
s.rates.Update(idle, StorageRangesMsg, 0, 0)
|
||||||
s.scheduleRevertStorageRequest(req)
|
s.scheduleRevertStorageRequest(req)
|
||||||
})
|
})
|
||||||
s.storageReqs[reqid] = req
|
s.storageReqs[reqid] = req
|
||||||
@ -1138,7 +1195,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
|
|||||||
if subtask != nil {
|
if subtask != nil {
|
||||||
origin, limit = req.origin[:], req.limit[:]
|
origin, limit = req.origin[:], req.limit[:]
|
||||||
}
|
}
|
||||||
if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, maxRequestSize); err != nil {
|
if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, uint64(cap)); err != nil {
|
||||||
log.Debug("Failed to request storage", "err", err)
|
log.Debug("Failed to request storage", "err", err)
|
||||||
s.scheduleRevertStorageRequest(req)
|
s.scheduleRevertStorageRequest(req)
|
||||||
}
|
}
|
||||||
@ -1157,10 +1214,24 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
|
|||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
// If there are no idle peers, short circuit assignment
|
// Sort the peers by download capacity to use faster ones if many available
|
||||||
if len(s.trienodeHealIdlers) == 0 {
|
idlers := &capacitySort{
|
||||||
|
ids: make([]string, 0, len(s.trienodeHealIdlers)),
|
||||||
|
caps: make([]float64, 0, len(s.trienodeHealIdlers)),
|
||||||
|
}
|
||||||
|
targetTTL := s.rates.TargetTimeout()
|
||||||
|
for id := range s.trienodeHealIdlers {
|
||||||
|
if _, ok := s.statelessPeers[id]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
idlers.ids = append(idlers.ids, id)
|
||||||
|
idlers.caps = append(idlers.caps, s.rates.Capacity(id, TrieNodesMsg, targetTTL))
|
||||||
|
}
|
||||||
|
if len(idlers.ids) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
sort.Sort(sort.Reverse(idlers))
|
||||||
|
|
||||||
// Iterate over pending tasks and try to find a peer to retrieve with
|
// Iterate over pending tasks and try to find a peer to retrieve with
|
||||||
for len(s.healer.trieTasks) > 0 || s.healer.scheduler.Pending() > 0 {
|
for len(s.healer.trieTasks) > 0 || s.healer.scheduler.Pending() > 0 {
|
||||||
// If there are not enough trie tasks queued to fully assign, fill the
|
// If there are not enough trie tasks queued to fully assign, fill the
|
||||||
@ -1186,20 +1257,15 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
|
|||||||
// Task pending retrieval, try to find an idle peer. If no such peer
|
// Task pending retrieval, try to find an idle peer. If no such peer
|
||||||
// exists, we probably assigned tasks for all (or they are stateless).
|
// exists, we probably assigned tasks for all (or they are stateless).
|
||||||
// Abort the entire assignment mechanism.
|
// Abort the entire assignment mechanism.
|
||||||
var idle string
|
if len(idlers.ids) == 0 {
|
||||||
for id := range s.trienodeHealIdlers {
|
|
||||||
// If the peer rejected a query in this sync cycle, don't bother asking
|
|
||||||
// again for anything, it's either out of sync or already pruned
|
|
||||||
if _, ok := s.statelessPeers[id]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
idle = id
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if idle == "" {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
peer := s.peers[idle]
|
var (
|
||||||
|
idle = idlers.ids[0]
|
||||||
|
peer = s.peers[idle]
|
||||||
|
cap = idlers.caps[0]
|
||||||
|
)
|
||||||
|
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
|
||||||
|
|
||||||
// Matched a pending task to an idle peer, allocate a unique request id
|
// Matched a pending task to an idle peer, allocate a unique request id
|
||||||
var reqid uint64
|
var reqid uint64
|
||||||
@ -1214,10 +1280,13 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Generate the network query and send it to the peer
|
// Generate the network query and send it to the peer
|
||||||
|
if cap > maxTrieRequestCount {
|
||||||
|
cap = maxTrieRequestCount
|
||||||
|
}
|
||||||
var (
|
var (
|
||||||
hashes = make([]common.Hash, 0, maxTrieRequestCount)
|
hashes = make([]common.Hash, 0, int(cap))
|
||||||
paths = make([]trie.SyncPath, 0, maxTrieRequestCount)
|
paths = make([]trie.SyncPath, 0, int(cap))
|
||||||
pathsets = make([]TrieNodePathSet, 0, maxTrieRequestCount)
|
pathsets = make([]TrieNodePathSet, 0, int(cap))
|
||||||
)
|
)
|
||||||
for hash, pathset := range s.healer.trieTasks {
|
for hash, pathset := range s.healer.trieTasks {
|
||||||
delete(s.healer.trieTasks, hash)
|
delete(s.healer.trieTasks, hash)
|
||||||
@ -1226,13 +1295,14 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
|
|||||||
paths = append(paths, pathset)
|
paths = append(paths, pathset)
|
||||||
pathsets = append(pathsets, [][]byte(pathset)) // TODO(karalabe): group requests by account hash
|
pathsets = append(pathsets, [][]byte(pathset)) // TODO(karalabe): group requests by account hash
|
||||||
|
|
||||||
if len(hashes) >= maxTrieRequestCount {
|
if len(hashes) >= int(cap) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
req := &trienodeHealRequest{
|
req := &trienodeHealRequest{
|
||||||
peer: idle,
|
peer: idle,
|
||||||
id: reqid,
|
id: reqid,
|
||||||
|
time: time.Now(),
|
||||||
deliver: success,
|
deliver: success,
|
||||||
revert: fail,
|
revert: fail,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@ -1241,8 +1311,9 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
|
|||||||
paths: paths,
|
paths: paths,
|
||||||
task: s.healer,
|
task: s.healer,
|
||||||
}
|
}
|
||||||
req.timeout = time.AfterFunc(requestTimeout, func() {
|
req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
|
||||||
peer.Log().Debug("Trienode heal request timed out", "reqid", reqid)
|
peer.Log().Debug("Trienode heal request timed out", "reqid", reqid)
|
||||||
|
s.rates.Update(idle, TrieNodesMsg, 0, 0)
|
||||||
s.scheduleRevertTrienodeHealRequest(req)
|
s.scheduleRevertTrienodeHealRequest(req)
|
||||||
})
|
})
|
||||||
s.trienodeHealReqs[reqid] = req
|
s.trienodeHealReqs[reqid] = req
|
||||||
@ -1267,10 +1338,24 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
|
|||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
// If there are no idle peers, short circuit assignment
|
// Sort the peers by download capacity to use faster ones if many available
|
||||||
if len(s.bytecodeHealIdlers) == 0 {
|
idlers := &capacitySort{
|
||||||
|
ids: make([]string, 0, len(s.bytecodeHealIdlers)),
|
||||||
|
caps: make([]float64, 0, len(s.bytecodeHealIdlers)),
|
||||||
|
}
|
||||||
|
targetTTL := s.rates.TargetTimeout()
|
||||||
|
for id := range s.bytecodeHealIdlers {
|
||||||
|
if _, ok := s.statelessPeers[id]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
idlers.ids = append(idlers.ids, id)
|
||||||
|
idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL))
|
||||||
|
}
|
||||||
|
if len(idlers.ids) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
sort.Sort(sort.Reverse(idlers))
|
||||||
|
|
||||||
// Iterate over pending tasks and try to find a peer to retrieve with
|
// Iterate over pending tasks and try to find a peer to retrieve with
|
||||||
for len(s.healer.codeTasks) > 0 || s.healer.scheduler.Pending() > 0 {
|
for len(s.healer.codeTasks) > 0 || s.healer.scheduler.Pending() > 0 {
|
||||||
// If there are not enough trie tasks queued to fully assign, fill the
|
// If there are not enough trie tasks queued to fully assign, fill the
|
||||||
@ -1296,20 +1381,15 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
|
|||||||
// Task pending retrieval, try to find an idle peer. If no such peer
|
// Task pending retrieval, try to find an idle peer. If no such peer
|
||||||
// exists, we probably assigned tasks for all (or they are stateless).
|
// exists, we probably assigned tasks for all (or they are stateless).
|
||||||
// Abort the entire assignment mechanism.
|
// Abort the entire assignment mechanism.
|
||||||
var idle string
|
if len(idlers.ids) == 0 {
|
||||||
for id := range s.bytecodeHealIdlers {
|
|
||||||
// If the peer rejected a query in this sync cycle, don't bother asking
|
|
||||||
// again for anything, it's either out of sync or already pruned
|
|
||||||
if _, ok := s.statelessPeers[id]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
idle = id
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if idle == "" {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
peer := s.peers[idle]
|
var (
|
||||||
|
idle = idlers.ids[0]
|
||||||
|
peer = s.peers[idle]
|
||||||
|
cap = idlers.caps[0]
|
||||||
|
)
|
||||||
|
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
|
||||||
|
|
||||||
// Matched a pending task to an idle peer, allocate a unique request id
|
// Matched a pending task to an idle peer, allocate a unique request id
|
||||||
var reqid uint64
|
var reqid uint64
|
||||||
@ -1324,18 +1404,22 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Generate the network query and send it to the peer
|
// Generate the network query and send it to the peer
|
||||||
hashes := make([]common.Hash, 0, maxCodeRequestCount)
|
if cap > maxCodeRequestCount {
|
||||||
|
cap = maxCodeRequestCount
|
||||||
|
}
|
||||||
|
hashes := make([]common.Hash, 0, int(cap))
|
||||||
for hash := range s.healer.codeTasks {
|
for hash := range s.healer.codeTasks {
|
||||||
delete(s.healer.codeTasks, hash)
|
delete(s.healer.codeTasks, hash)
|
||||||
|
|
||||||
hashes = append(hashes, hash)
|
hashes = append(hashes, hash)
|
||||||
if len(hashes) >= maxCodeRequestCount {
|
if len(hashes) >= int(cap) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
req := &bytecodeHealRequest{
|
req := &bytecodeHealRequest{
|
||||||
peer: idle,
|
peer: idle,
|
||||||
id: reqid,
|
id: reqid,
|
||||||
|
time: time.Now(),
|
||||||
deliver: success,
|
deliver: success,
|
||||||
revert: fail,
|
revert: fail,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@ -1343,8 +1427,9 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
|
|||||||
hashes: hashes,
|
hashes: hashes,
|
||||||
task: s.healer,
|
task: s.healer,
|
||||||
}
|
}
|
||||||
req.timeout = time.AfterFunc(requestTimeout, func() {
|
req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
|
||||||
peer.Log().Debug("Bytecode heal request timed out", "reqid", reqid)
|
peer.Log().Debug("Bytecode heal request timed out", "reqid", reqid)
|
||||||
|
s.rates.Update(idle, ByteCodesMsg, 0, 0)
|
||||||
s.scheduleRevertBytecodeHealRequest(req)
|
s.scheduleRevertBytecodeHealRequest(req)
|
||||||
})
|
})
|
||||||
s.bytecodeHealReqs[reqid] = req
|
s.bytecodeHealReqs[reqid] = req
|
||||||
@ -2142,6 +2227,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
delete(s.accountReqs, id)
|
delete(s.accountReqs, id)
|
||||||
|
s.rates.Update(peer.ID(), AccountRangeMsg, time.Since(req.time), int(size))
|
||||||
|
|
||||||
// Clean up the request timeout timer, we'll see how to proceed further based
|
// Clean up the request timeout timer, we'll see how to proceed further based
|
||||||
// on the actual delivered content
|
// on the actual delivered content
|
||||||
@ -2253,6 +2339,7 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
delete(s.bytecodeReqs, id)
|
delete(s.bytecodeReqs, id)
|
||||||
|
s.rates.Update(peer.ID(), ByteCodesMsg, time.Since(req.time), len(bytecodes))
|
||||||
|
|
||||||
// Clean up the request timeout timer, we'll see how to proceed further based
|
// Clean up the request timeout timer, we'll see how to proceed further based
|
||||||
// on the actual delivered content
|
// on the actual delivered content
|
||||||
@ -2361,6 +2448,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
delete(s.storageReqs, id)
|
delete(s.storageReqs, id)
|
||||||
|
s.rates.Update(peer.ID(), StorageRangesMsg, time.Since(req.time), int(size))
|
||||||
|
|
||||||
// Clean up the request timeout timer, we'll see how to proceed further based
|
// Clean up the request timeout timer, we'll see how to proceed further based
|
||||||
// on the actual delivered content
|
// on the actual delivered content
|
||||||
@ -2487,6 +2575,7 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
delete(s.trienodeHealReqs, id)
|
delete(s.trienodeHealReqs, id)
|
||||||
|
s.rates.Update(peer.ID(), TrieNodesMsg, time.Since(req.time), len(trienodes))
|
||||||
|
|
||||||
// Clean up the request timeout timer, we'll see how to proceed further based
|
// Clean up the request timeout timer, we'll see how to proceed further based
|
||||||
// on the actual delivered content
|
// on the actual delivered content
|
||||||
@ -2581,6 +2670,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
delete(s.bytecodeHealReqs, id)
|
delete(s.bytecodeHealReqs, id)
|
||||||
|
s.rates.Update(peer.ID(), ByteCodesMsg, time.Since(req.time), len(bytecodes))
|
||||||
|
|
||||||
// Clean up the request timeout timer, we'll see how to proceed further based
|
// Clean up the request timeout timer, we'll see how to proceed further based
|
||||||
// on the actual delivered content
|
// on the actual delivered content
|
||||||
@ -2756,3 +2846,24 @@ func estimateRemainingSlots(hashes int, last common.Hash) (uint64, error) {
|
|||||||
}
|
}
|
||||||
return space.Uint64() - uint64(hashes), nil
|
return space.Uint64() - uint64(hashes), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// capacitySort implements the Sort interface, allowing sorting by peer message
|
||||||
|
// throughput. Note, callers should use sort.Reverse to get the desired effect
|
||||||
|
// of highest capacity being at the front.
|
||||||
|
type capacitySort struct {
|
||||||
|
ids []string
|
||||||
|
caps []float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *capacitySort) Len() int {
|
||||||
|
return len(s.ids)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *capacitySort) Less(i, j int) bool {
|
||||||
|
return s.caps[i] < s.caps[j]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *capacitySort) Swap(i, j int) {
|
||||||
|
s.ids[i], s.ids[j] = s.ids[j], s.ids[i]
|
||||||
|
s.caps[i], s.caps[j] = s.caps[j], s.caps[i]
|
||||||
|
}
|
||||||
|
@ -796,12 +796,6 @@ func TestMultiSyncManyUseless(t *testing.T) {
|
|||||||
|
|
||||||
// TestMultiSyncManyUseless contains one good peer, and many which doesn't return anything valuable at all
|
// TestMultiSyncManyUseless contains one good peer, and many which doesn't return anything valuable at all
|
||||||
func TestMultiSyncManyUselessWithLowTimeout(t *testing.T) {
|
func TestMultiSyncManyUselessWithLowTimeout(t *testing.T) {
|
||||||
// We're setting the timeout to very low, to increase the chance of the timeout
|
|
||||||
// being triggered. This was previously a cause of panic, when a response
|
|
||||||
// arrived simultaneously as a timeout was triggered.
|
|
||||||
defer func(old time.Duration) { requestTimeout = old }(requestTimeout)
|
|
||||||
requestTimeout = time.Millisecond
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
once sync.Once
|
once sync.Once
|
||||||
cancel = make(chan struct{})
|
cancel = make(chan struct{})
|
||||||
@ -838,6 +832,11 @@ func TestMultiSyncManyUselessWithLowTimeout(t *testing.T) {
|
|||||||
mkSource("noStorage", true, false, true),
|
mkSource("noStorage", true, false, true),
|
||||||
mkSource("noTrie", true, true, false),
|
mkSource("noTrie", true, true, false),
|
||||||
)
|
)
|
||||||
|
// We're setting the timeout to very low, to increase the chance of the timeout
|
||||||
|
// being triggered. This was previously a cause of panic, when a response
|
||||||
|
// arrived simultaneously as a timeout was triggered.
|
||||||
|
syncer.rates.OverrideTTLLimit = time.Millisecond
|
||||||
|
|
||||||
done := checkStall(t, term)
|
done := checkStall(t, term)
|
||||||
if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
|
if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
|
||||||
t.Fatalf("sync failed: %v", err)
|
t.Fatalf("sync failed: %v", err)
|
||||||
@ -848,10 +847,6 @@ func TestMultiSyncManyUselessWithLowTimeout(t *testing.T) {
|
|||||||
|
|
||||||
// TestMultiSyncManyUnresponsive contains one good peer, and many which doesn't respond at all
|
// TestMultiSyncManyUnresponsive contains one good peer, and many which doesn't respond at all
|
||||||
func TestMultiSyncManyUnresponsive(t *testing.T) {
|
func TestMultiSyncManyUnresponsive(t *testing.T) {
|
||||||
// We're setting the timeout to very low, to make the test run a bit faster
|
|
||||||
defer func(old time.Duration) { requestTimeout = old }(requestTimeout)
|
|
||||||
requestTimeout = time.Millisecond
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
once sync.Once
|
once sync.Once
|
||||||
cancel = make(chan struct{})
|
cancel = make(chan struct{})
|
||||||
@ -888,6 +883,9 @@ func TestMultiSyncManyUnresponsive(t *testing.T) {
|
|||||||
mkSource("noStorage", true, false, true),
|
mkSource("noStorage", true, false, true),
|
||||||
mkSource("noTrie", true, true, false),
|
mkSource("noTrie", true, true, false),
|
||||||
)
|
)
|
||||||
|
// We're setting the timeout to very low, to make the test run a bit faster
|
||||||
|
syncer.rates.OverrideTTLLimit = time.Millisecond
|
||||||
|
|
||||||
done := checkStall(t, term)
|
done := checkStall(t, term)
|
||||||
if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
|
if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
|
||||||
t.Fatalf("sync failed: %v", err)
|
t.Fatalf("sync failed: %v", err)
|
||||||
|
458
p2p/msgrate/msgrate.go
Normal file
458
p2p/msgrate/msgrate.go
Normal file
@ -0,0 +1,458 @@
|
|||||||
|
// Copyright 2021 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
// Package msgrate allows estimating the throughput of peers for more balanced syncs.
|
||||||
|
package msgrate
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// measurementImpact is the impact a single measurement has on a peer's final
|
||||||
|
// capacity value. A value closer to 0 reacts slower to sudden network changes,
|
||||||
|
// but it is also more stable against temporary hiccups. 0.1 worked well for
|
||||||
|
// most of Ethereum's existence, so might as well go with it.
|
||||||
|
const measurementImpact = 0.1
|
||||||
|
|
||||||
|
// capacityOverestimation is the ratio of items to over-estimate when retrieving
|
||||||
|
// a peer's capacity to avoid locking into a lower value due to never attempting
|
||||||
|
// to fetch more than some local stable value.
|
||||||
|
const capacityOverestimation = 1.01
|
||||||
|
|
||||||
|
// qosTuningPeers is the number of best peers to tune round trip times based on.
|
||||||
|
// An Ethereum node doesn't need hundreds of connections to operate correctly,
|
||||||
|
// so instead of lowering our download speed to the median of potentially many
|
||||||
|
// bad nodes, we can target a smaller set of vey good nodes. At worse this will
|
||||||
|
// result in less nodes to sync from, but that's still better than some hogging
|
||||||
|
// the pipeline.
|
||||||
|
const qosTuningPeers = 5
|
||||||
|
|
||||||
|
// rttMinEstimate is the minimal round trip time to target requests for. Since
|
||||||
|
// every request entails a 2 way latency + bandwidth + serving database lookups,
|
||||||
|
// it should be generous enough to permit meaningful work to be done on top of
|
||||||
|
// the transmission costs.
|
||||||
|
const rttMinEstimate = 2 * time.Second
|
||||||
|
|
||||||
|
// rttMaxEstimate is the maximal round trip time to target requests for. Although
|
||||||
|
// the expectation is that a well connected node will never reach this, certain
|
||||||
|
// special connectivity ones might experience significant delays (e.g. satellite
|
||||||
|
// uplink with 3s RTT). This value should be low enough to forbid stalling the
|
||||||
|
// pipeline too long, but large enough to cover the worst of the worst links.
|
||||||
|
const rttMaxEstimate = 20 * time.Second
|
||||||
|
|
||||||
|
// rttPushdownFactor is a multiplier to attempt forcing quicker requests than
|
||||||
|
// what the message rate tracker estimates. The reason is that message rate
|
||||||
|
// tracking adapts queries to the RTT, but multiple RTT values can be perfectly
|
||||||
|
// valid, they just result in higher packet sizes. Since smaller packets almost
|
||||||
|
// always result in stabler download streams, this factor hones in on the lowest
|
||||||
|
// RTT from all the functional ones.
|
||||||
|
const rttPushdownFactor = 0.9
|
||||||
|
|
||||||
|
// rttMinConfidence is the minimum value the roundtrip confidence factor may drop
|
||||||
|
// to. Since the target timeouts are based on how confident the tracker is in the
|
||||||
|
// true roundtrip, it's important to not allow too huge fluctuations.
|
||||||
|
const rttMinConfidence = 0.1
|
||||||
|
|
||||||
|
// ttlScaling is the multiplier that converts the estimated roundtrip time to a
|
||||||
|
// timeout cap for network requests. The expectation is that peers' response time
|
||||||
|
// will fluctuate around the estimated roundtrip, but depending in their load at
|
||||||
|
// request time, it might be higher than anticipated. This scaling factor ensures
|
||||||
|
// that we allow remote connections some slack but at the same time do enforce a
|
||||||
|
// behavior similar to our median peers.
|
||||||
|
const ttlScaling = 3
|
||||||
|
|
||||||
|
// ttlLimit is the maximum timeout allowance to prevent reaching crazy numbers
|
||||||
|
// if some unforeseen network events shappen. As much as we try to hone in on
|
||||||
|
// the most optimal values, it doesn't make any sense to go above a threshold,
|
||||||
|
// even if everything is slow and screwy.
|
||||||
|
const ttlLimit = time.Minute
|
||||||
|
|
||||||
|
// tuningConfidenceCap is the number of active peers above which to stop detuning
|
||||||
|
// the confidence number. The idea here is that once we hone in on the capacity
|
||||||
|
// of a meaningful number of peers, adding one more should ot have a significant
|
||||||
|
// impact on things, so just ron with the originals.
|
||||||
|
const tuningConfidenceCap = 10
|
||||||
|
|
||||||
|
// tuningImpact is the influence that a new tuning target has on the previously
|
||||||
|
// cached value. This number is mostly just an out-of-the-blue heuristic that
|
||||||
|
// prevents the estimates from jumping around. There's no particular reason for
|
||||||
|
// the current value.
|
||||||
|
const tuningImpact = 0.25
|
||||||
|
|
||||||
|
// Tracker estimates the throughput capacity of a peer with regard to each data
|
||||||
|
// type it can deliver. The goal is to dynamically adjust request sizes to max
|
||||||
|
// out network throughput without overloading either the peer or th elocal node.
|
||||||
|
//
|
||||||
|
// By tracking in real time the latencies and bandiwdths peers exhibit for each
|
||||||
|
// packet type, it's possible to prevent overloading by detecting a slowdown on
|
||||||
|
// one type when another type is pushed too hard.
|
||||||
|
//
|
||||||
|
// Similarly, real time measurements also help avoid overloading the local net
|
||||||
|
// connection if our peers would otherwise be capable to deliver more, but the
|
||||||
|
// local link is saturated. In that case, the live measurements will force us
|
||||||
|
// to reduce request sizes until the throughput gets stable.
|
||||||
|
//
|
||||||
|
// Lastly, message rate measurements allows us to detect if a peer is unsuaully
|
||||||
|
// slow compared to other peers, in which case we can decide to keep it around
|
||||||
|
// or free up the slot so someone closer.
|
||||||
|
//
|
||||||
|
// Since throughput tracking and estimation adapts dynamically to live network
|
||||||
|
// conditions, it's fine to have multiple trackers locally track the same peer
|
||||||
|
// in different subsystem. The throughput will simply be distributed across the
|
||||||
|
// two trackers if both are highly active.
|
||||||
|
type Tracker struct {
|
||||||
|
// capacity is the number of items retrievable per second of a given type.
|
||||||
|
// It is analogous to bandwidth, but we deliberately avoided using bytes
|
||||||
|
// as the unit, since serving nodes also spend a lot of time loading data
|
||||||
|
// from disk, which is linear in the number of items, but mostly constant
|
||||||
|
// in their sizes.
|
||||||
|
//
|
||||||
|
// Callers of course are free to use the item counter as a byte counter if
|
||||||
|
// or when their protocol of choise if capped by bytes instead of items.
|
||||||
|
// (eg. eth.getHeaders vs snap.getAccountRange).
|
||||||
|
capacity map[uint64]float64
|
||||||
|
|
||||||
|
// roundtrip is the latency a peer in general responds to data requests.
|
||||||
|
// This number is not used inside the tracker, but is exposed to compare
|
||||||
|
// peers to each other and filter out slow ones. Note however, it only
|
||||||
|
// makes sense to compare RTTs if the caller caters request sizes for
|
||||||
|
// each peer to target the same RTT. There's no need to make this number
|
||||||
|
// the real networking RTT, we just need a number to compare peers with.
|
||||||
|
roundtrip time.Duration
|
||||||
|
|
||||||
|
lock sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTracker creates a new message rate tracker for a specific peer. An initial
|
||||||
|
// RTT is needed to avoid a peer getting marked as an outlier compared to others
|
||||||
|
// right after joining. It's suggested to use the median rtt across all peers to
|
||||||
|
// init a new peer tracker.
|
||||||
|
func NewTracker(caps map[uint64]float64, rtt time.Duration) *Tracker {
|
||||||
|
if caps == nil {
|
||||||
|
caps = make(map[uint64]float64)
|
||||||
|
}
|
||||||
|
return &Tracker{
|
||||||
|
capacity: caps,
|
||||||
|
roundtrip: rtt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capacity calculates the number of items the peer is estimated to be able to
|
||||||
|
// retrieve within the alloted time slot. The method will round up any division
|
||||||
|
// errors and will add an additional overestimation ratio on top. The reason for
|
||||||
|
// overshooting the capacity is because certain message types might not increase
|
||||||
|
// the load proportionally to the requested items, so fetching a bit more might
|
||||||
|
// still take the same RTT. By forcefully overshooting by a small amount, we can
|
||||||
|
// avoid locking into a lower-that-real capacity.
|
||||||
|
func (t *Tracker) Capacity(kind uint64, targetRTT time.Duration) float64 {
|
||||||
|
t.lock.RLock()
|
||||||
|
defer t.lock.RUnlock()
|
||||||
|
|
||||||
|
// Calculate the actual measured throughput
|
||||||
|
throughput := t.capacity[kind] * float64(targetRTT) / float64(time.Second)
|
||||||
|
|
||||||
|
// Return an overestimation to force the peer out of a stuck minima, adding
|
||||||
|
// +1 in case the item count is too low for the overestimator to dent
|
||||||
|
return 1 + capacityOverestimation*throughput
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update modifies the peer's capacity values for a specific data type with a new
|
||||||
|
// measurement. If the delivery is zero, the peer is assumed to have either timed
|
||||||
|
// out or to not have the requested data, resulting in a slash to 0 capacity. This
|
||||||
|
// avoids assigning the peer retrievals that it won't be able to honour.
|
||||||
|
func (t *Tracker) Update(kind uint64, elapsed time.Duration, items int) {
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
// If nothing was delivered (timeout / unavailable data), reduce throughput
|
||||||
|
// to minimum
|
||||||
|
if items == 0 {
|
||||||
|
t.capacity[kind] = 0
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Otherwise update the throughput with a new measurement
|
||||||
|
if elapsed <= 0 {
|
||||||
|
elapsed = 1 // +1 (ns) to ensure non-zero divisor
|
||||||
|
}
|
||||||
|
measured := float64(items) / (float64(elapsed) / float64(time.Second))
|
||||||
|
|
||||||
|
t.capacity[kind] = (1-measurementImpact)*(t.capacity[kind]) + measurementImpact*measured
|
||||||
|
t.roundtrip = time.Duration((1-measurementImpact)*float64(t.roundtrip) + measurementImpact*float64(elapsed))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trackers is a set of message rate trackers across a number of peers with the
|
||||||
|
// goal of aggregating certain measurements across the entire set for outlier
|
||||||
|
// filtering and newly joining initialization.
|
||||||
|
type Trackers struct {
|
||||||
|
trackers map[string]*Tracker
|
||||||
|
|
||||||
|
// roundtrip is the current best guess as to what is a stable round trip time
|
||||||
|
// across the entire collection of connected peers. This is derived from the
|
||||||
|
// various trackers added, but is used as a cache to avoid recomputing on each
|
||||||
|
// network request. The value is updated once every RTT to avoid fluctuations
|
||||||
|
// caused by hiccups or peer events.
|
||||||
|
roundtrip time.Duration
|
||||||
|
|
||||||
|
// confidence represents the probability that the estimated roundtrip value
|
||||||
|
// is the real one across all our peers. The confidence value is used as an
|
||||||
|
// impact factor of new measurements on old estimates. As our connectivity
|
||||||
|
// stabilizes, this value gravitates towards 1, new measurements havinng
|
||||||
|
// almost no impact. If there's a large peer churn and few peers, then new
|
||||||
|
// measurements will impact it more. The confidence is increased with every
|
||||||
|
// packet and dropped with every new connection.
|
||||||
|
confidence float64
|
||||||
|
|
||||||
|
// tuned is the time instance the tracker recalculated its cached roundtrip
|
||||||
|
// value and confidence values. A cleaner way would be to have a heartbeat
|
||||||
|
// goroutine do it regularly, but that requires a lot of maintenance to just
|
||||||
|
// run every now and again.
|
||||||
|
tuned time.Time
|
||||||
|
|
||||||
|
// The fields below can be used to override certain default values. Their
|
||||||
|
// purpose is to allow quicker tests. Don't use them in production.
|
||||||
|
OverrideTTLLimit time.Duration
|
||||||
|
|
||||||
|
log log.Logger
|
||||||
|
lock sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTrackers creates an empty set of trackers to be filled with peers.
|
||||||
|
func NewTrackers(log log.Logger) *Trackers {
|
||||||
|
return &Trackers{
|
||||||
|
trackers: make(map[string]*Tracker),
|
||||||
|
roundtrip: rttMaxEstimate,
|
||||||
|
confidence: 1,
|
||||||
|
tuned: time.Now(),
|
||||||
|
OverrideTTLLimit: ttlLimit,
|
||||||
|
log: log,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track inserts a new tracker into the set.
|
||||||
|
func (t *Trackers) Track(id string, tracker *Tracker) error {
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
if _, ok := t.trackers[id]; ok {
|
||||||
|
return errors.New("already tracking")
|
||||||
|
}
|
||||||
|
t.trackers[id] = tracker
|
||||||
|
t.detune()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Untrack stops tracking a previously added peer.
|
||||||
|
func (t *Trackers) Untrack(id string) error {
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
if _, ok := t.trackers[id]; !ok {
|
||||||
|
return errors.New("not tracking")
|
||||||
|
}
|
||||||
|
delete(t.trackers, id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MedianRoundTrip returns the median RTT across all known trackers. The purpose
|
||||||
|
// of the median RTT is to initialize a new peer with sane statistics that it will
|
||||||
|
// hopefully outperform. If it seriously underperforms, there's a risk of dropping
|
||||||
|
// the peer, but that is ok as we're aiming for a strong median.
|
||||||
|
func (t *Trackers) MedianRoundTrip() time.Duration {
|
||||||
|
t.lock.RLock()
|
||||||
|
defer t.lock.RUnlock()
|
||||||
|
|
||||||
|
return t.medianRoundTrip()
|
||||||
|
}
|
||||||
|
|
||||||
|
// medianRoundTrip is the internal lockless version of MedianRoundTrip to be used
|
||||||
|
// by the QoS tuner.
|
||||||
|
func (t *Trackers) medianRoundTrip() time.Duration {
|
||||||
|
// Gather all the currently measured round trip times
|
||||||
|
rtts := make([]float64, 0, len(t.trackers))
|
||||||
|
for _, tt := range t.trackers {
|
||||||
|
tt.lock.RLock()
|
||||||
|
rtts = append(rtts, float64(tt.roundtrip))
|
||||||
|
tt.lock.RUnlock()
|
||||||
|
}
|
||||||
|
sort.Float64s(rtts)
|
||||||
|
|
||||||
|
median := rttMaxEstimate
|
||||||
|
if qosTuningPeers <= len(rtts) {
|
||||||
|
median = time.Duration(rtts[qosTuningPeers/2]) // Median of our best few peers
|
||||||
|
} else if len(rtts) > 0 {
|
||||||
|
median = time.Duration(rtts[len(rtts)/2]) // Median of all out connected peers
|
||||||
|
}
|
||||||
|
// Restrict the RTT into some QoS defaults, irrelevant of true RTT
|
||||||
|
if median < rttMinEstimate {
|
||||||
|
median = rttMinEstimate
|
||||||
|
}
|
||||||
|
if median > rttMaxEstimate {
|
||||||
|
median = rttMaxEstimate
|
||||||
|
}
|
||||||
|
return median
|
||||||
|
}
|
||||||
|
|
||||||
|
// MeanCapacities returns the capacities averaged across all the added trackers.
|
||||||
|
// The purpos of the mean capacities are to initialize a new peer with some sane
|
||||||
|
// starting values that it will hopefully outperform. If the mean overshoots, the
|
||||||
|
// peer will be cut back to minimal capacity and given another chance.
|
||||||
|
func (t *Trackers) MeanCapacities() map[uint64]float64 {
|
||||||
|
t.lock.RLock()
|
||||||
|
defer t.lock.RUnlock()
|
||||||
|
|
||||||
|
return t.meanCapacities()
|
||||||
|
}
|
||||||
|
|
||||||
|
// meanCapacities is the internal lockless version of MeanCapacities used for
|
||||||
|
// debug logging.
|
||||||
|
func (t *Trackers) meanCapacities() map[uint64]float64 {
|
||||||
|
capacities := make(map[uint64]float64)
|
||||||
|
for _, tt := range t.trackers {
|
||||||
|
tt.lock.RLock()
|
||||||
|
for key, val := range tt.capacity {
|
||||||
|
capacities[key] += val
|
||||||
|
}
|
||||||
|
tt.lock.RUnlock()
|
||||||
|
}
|
||||||
|
for key, val := range capacities {
|
||||||
|
capacities[key] = val / float64(len(t.trackers))
|
||||||
|
}
|
||||||
|
return capacities
|
||||||
|
}
|
||||||
|
|
||||||
|
// TargetRoundTrip returns the current target round trip time for a request to
|
||||||
|
// complete in.The returned RTT is slightly under the estimated RTT. The reason
|
||||||
|
// is that message rate estimation is a 2 dimensional problem which is solvable
|
||||||
|
// for any RTT. The goal is to gravitate towards smaller RTTs instead of large
|
||||||
|
// messages, to result in a stabler download stream.
|
||||||
|
func (t *Trackers) TargetRoundTrip() time.Duration {
|
||||||
|
// Recalculate the internal caches if it's been a while
|
||||||
|
t.tune()
|
||||||
|
|
||||||
|
// Caches surely recent, return target roundtrip
|
||||||
|
t.lock.RLock()
|
||||||
|
defer t.lock.RUnlock()
|
||||||
|
|
||||||
|
return time.Duration(float64(t.roundtrip) * rttPushdownFactor)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TargetTimeout returns the timeout allowance for a single request to finish
|
||||||
|
// under. The timeout is proportional to the roundtrip, but also takes into
|
||||||
|
// consideration the tracker's confidence in said roundtrip and scales it
|
||||||
|
// accordingly. The final value is capped to avoid runaway requests.
|
||||||
|
func (t *Trackers) TargetTimeout() time.Duration {
|
||||||
|
// Recalculate the internal caches if it's been a while
|
||||||
|
t.tune()
|
||||||
|
|
||||||
|
// Caches surely recent, return target timeout
|
||||||
|
t.lock.RLock()
|
||||||
|
defer t.lock.RUnlock()
|
||||||
|
|
||||||
|
return t.targetTimeout()
|
||||||
|
}
|
||||||
|
|
||||||
|
// targetTimeout is the internal lockless version of TargetTimeout to be used
|
||||||
|
// during QoS tuning.
|
||||||
|
func (t *Trackers) targetTimeout() time.Duration {
|
||||||
|
timeout := time.Duration(ttlScaling * float64(t.roundtrip) / t.confidence)
|
||||||
|
if timeout > t.OverrideTTLLimit {
|
||||||
|
timeout = t.OverrideTTLLimit
|
||||||
|
}
|
||||||
|
return timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
// tune gathers the individual tracker statistics and updates the estimated
|
||||||
|
// request round trip time.
|
||||||
|
func (t *Trackers) tune() {
|
||||||
|
// Tune may be called concurrently all over the place, but we only want to
|
||||||
|
// periodically update and even then only once. First check if it was updated
|
||||||
|
// recently and abort if so.
|
||||||
|
t.lock.RLock()
|
||||||
|
dirty := time.Since(t.tuned) > t.roundtrip
|
||||||
|
t.lock.RUnlock()
|
||||||
|
if !dirty {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// If an update is needed, obtain a write lock but make sure we don't update
|
||||||
|
// it on all concurrent threads one by one.
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
if dirty := time.Since(t.tuned) > t.roundtrip; !dirty {
|
||||||
|
return // A concurrent request beat us to the tuning
|
||||||
|
}
|
||||||
|
// First thread reaching the tuning point, update the estimates and return
|
||||||
|
t.roundtrip = time.Duration((1-tuningImpact)*float64(t.roundtrip) + tuningImpact*float64(t.medianRoundTrip()))
|
||||||
|
t.confidence = t.confidence + (1-t.confidence)/2
|
||||||
|
|
||||||
|
t.tuned = time.Now()
|
||||||
|
t.log.Debug("Recalculated msgrate QoS values", "rtt", t.roundtrip, "confidence", t.confidence, "ttl", t.targetTimeout(), "next", t.tuned.Add(t.roundtrip))
|
||||||
|
t.log.Trace("Debug dump of mean capacities", "caps", log.Lazy{Fn: t.meanCapacities})
|
||||||
|
}
|
||||||
|
|
||||||
|
// detune reduces the tracker's confidence in order to make fresh measurements
|
||||||
|
// have a larger impact on the estimates. It is meant to be used during new peer
|
||||||
|
// connections so they can have a proper impact on the estimates.
|
||||||
|
func (t *Trackers) detune() {
|
||||||
|
// If we have a single peer, confidence is always 1
|
||||||
|
if len(t.trackers) == 1 {
|
||||||
|
t.confidence = 1
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// If we have a ton of peers, don't drop the confidence since there's enough
|
||||||
|
// remaining to retain the same throughput
|
||||||
|
if len(t.trackers) >= tuningConfidenceCap {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Otherwise drop the confidence factor
|
||||||
|
peers := float64(len(t.trackers))
|
||||||
|
|
||||||
|
t.confidence = t.confidence * (peers - 1) / peers
|
||||||
|
if t.confidence < rttMinConfidence {
|
||||||
|
t.confidence = rttMinConfidence
|
||||||
|
}
|
||||||
|
t.log.Debug("Relaxed msgrate QoS values", "rtt", t.roundtrip, "confidence", t.confidence, "ttl", t.targetTimeout())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capacity is a helper function to access a specific tracker without having to
|
||||||
|
// track it explicitly outside.
|
||||||
|
func (t *Trackers) Capacity(id string, kind uint64, targetRTT time.Duration) float64 {
|
||||||
|
t.lock.RLock()
|
||||||
|
defer t.lock.RUnlock()
|
||||||
|
|
||||||
|
tracker := t.trackers[id]
|
||||||
|
if tracker == nil {
|
||||||
|
return 1 // Unregister race, don't return 0, it's a dangerous number
|
||||||
|
}
|
||||||
|
return tracker.Capacity(kind, targetRTT)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update is a helper function to access a specific tracker without having to
|
||||||
|
// track it explicitly outside.
|
||||||
|
func (t *Trackers) Update(id string, kind uint64, elapsed time.Duration, items int) {
|
||||||
|
t.lock.RLock()
|
||||||
|
defer t.lock.RUnlock()
|
||||||
|
|
||||||
|
if tracker := t.trackers[id]; tracker != nil {
|
||||||
|
tracker.Update(kind, elapsed, items)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user