Merge pull request #2627 from karalabe/concurrent-head-sync

eth/downloader, trie: pull head state concurrently with chain
This commit is contained in:
Péter Szilágyi 2016-05-31 11:52:16 +03:00
commit 1d5d21726a
3 changed files with 41 additions and 30 deletions

@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics" "github.com/rcrowley/go-metrics"
) )
@ -114,7 +115,6 @@ type Downloader struct {
// Statistics // Statistics
syncStatsChainOrigin uint64 // Origin block number where syncing started at syncStatsChainOrigin uint64 // Origin block number where syncing started at
syncStatsChainHeight uint64 // Highest block number known when syncing started syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsStateTotal uint64 // Total number of node state entries known so far
syncStatsStateDone uint64 // Number of state trie entries already pulled syncStatsStateDone uint64 // Number of state trie entries already pulled
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
@ -321,12 +321,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
empty = true empty = true
} }
} }
// Reset any ephemeral sync statistics
d.syncStatsLock.Lock()
d.syncStatsStateTotal = 0
d.syncStatsStateDone = 0
d.syncStatsLock.Unlock()
// Create cancel channel for aborting mid-flight // Create cancel channel for aborting mid-flight
d.cancelLock.Lock() d.cancelLock.Lock()
d.cancelCh = make(chan struct{}) d.cancelCh = make(chan struct{})
@ -382,7 +376,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d.syncStatsLock.Unlock() d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent hash and block retrieval algorithm // Initiate the sync using a concurrent hash and block retrieval algorithm
d.queue.Prepare(origin+1, d.mode, 0) d.queue.Prepare(origin+1, d.mode, 0, nil)
if d.syncInitHook != nil { if d.syncInitHook != nil {
d.syncInitHook(origin, latest) d.syncInitHook(origin, latest)
} }
@ -397,7 +391,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if err != nil { if err != nil {
return err return err
} }
origin, err := d.findAncestor(p, latest) height := latest.Number.Uint64()
origin, err := d.findAncestor(p, height)
if err != nil { if err != nil {
return err return err
} }
@ -405,22 +401,22 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin d.syncStatsChainOrigin = origin
} }
d.syncStatsChainHeight = latest d.syncStatsChainHeight = height
d.syncStatsLock.Unlock() d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent header and content retrieval algorithm // Initiate the sync using a concurrent header and content retrieval algorithm
pivot := uint64(0) pivot := uint64(0)
switch d.mode { switch d.mode {
case LightSync: case LightSync:
pivot = latest pivot = height
case FastSync: case FastSync:
// Calculate the new fast/slow sync pivot point // Calculate the new fast/slow sync pivot point
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
if err != nil { if err != nil {
panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
} }
if latest > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
pivot = latest - uint64(fsMinFullBlocks) - pivotOffset.Uint64() pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
} }
// If the point is below the origin, move origin back to ensure state download // If the point is below the origin, move origin back to ensure state download
if pivot < origin { if pivot < origin {
@ -432,9 +428,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
} }
glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot) glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
} }
d.queue.Prepare(origin+1, d.mode, pivot) d.queue.Prepare(origin+1, d.mode, pivot, latest)
if d.syncInitHook != nil { if d.syncInitHook != nil {
d.syncInitHook(origin, latest) d.syncInitHook(origin, height)
} }
return d.spawnSync(origin+1, return d.spawnSync(origin+1,
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
@ -952,7 +948,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// fetchHeight retrieves the head header of the remote peer to aid in estimating // fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take. // the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight(p *peer) (uint64, error) { func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p) glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
// Request the advertised remote head block and wait for the response // Request the advertised remote head block and wait for the response
@ -962,7 +958,7 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
for { for {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return 0, errCancelBlockFetch return nil, errCancelBlockFetch
case packet := <-d.headerCh: case packet := <-d.headerCh:
// Discard anything not from the origin peer // Discard anything not from the origin peer
@ -974,13 +970,13 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
headers := packet.(*headerPack).headers headers := packet.(*headerPack).headers
if len(headers) != 1 { if len(headers) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers)) glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
return 0, errBadPeer return nil, errBadPeer
} }
return headers[0].Number.Uint64(), nil return headers[0], nil
case <-timeout: case <-timeout:
glog.V(logger.Debug).Infof("%v: head header timeout", p) glog.V(logger.Debug).Infof("%v: head header timeout", p)
return 0, errTimeout return nil, errTimeout
case <-d.bodyCh: case <-d.bodyCh:
case <-d.stateCh: case <-d.stateCh:
@ -1369,10 +1365,10 @@ func (d *Downloader) fetchNodeData() error {
deliver = func(packet dataPack) (int, error) { deliver = func(packet dataPack) (int, error) {
start := time.Now() start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
// If the peer gave us nothing, stalling fast sync, drop // If the peer returned old-requested data, forgive
if delivered == 0 { if err == trie.ErrNotRequested {
glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId()) glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
d.dropPeer(packet.PeerId()) return
} }
if err != nil { if err != nil {
// If the node data processing failed, the root hash is very wrong, abort // If the node data processing failed, the root hash is very wrong, abort
@ -1381,17 +1377,21 @@ func (d *Downloader) fetchNodeData() error {
return return
} }
// Processing succeeded, notify state fetcher of continuation // Processing succeeded, notify state fetcher of continuation
if d.queue.PendingNodeData() > 0 { pending := d.queue.PendingNodeData()
if pending > 0 {
select { select {
case d.stateWakeCh <- true: case d.stateWakeCh <- true:
default: default:
} }
} }
// Log a message to the user and return
d.syncStatsLock.Lock() d.syncStatsLock.Lock()
defer d.syncStatsLock.Unlock()
d.syncStatsStateDone += uint64(delivered) d.syncStatsStateDone += uint64(delivered)
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) d.syncStatsLock.Unlock()
// Log a message to the user and return
if delivered > 0 {
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d, pending at least %d", delivered, time.Since(start), d.syncStatsStateDone, pending)
}
}) })
} }
expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) } expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }

@ -1262,13 +1262,19 @@ func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error,
// Prepare configures the result cache to allow accepting and caching inbound // Prepare configures the result cache to allow accepting and caching inbound
// fetch results. // fetch results.
func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) { func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Prepare the queue for sync results
if q.resultOffset < offset { if q.resultOffset < offset {
q.resultOffset = offset q.resultOffset = offset
} }
q.fastSyncPivot = pivot q.fastSyncPivot = pivot
q.mode = mode q.mode = mode
// If long running fast sync, also start up a head stateretrieval immediately
if mode == FastSync && pivot > 0 {
q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase)
}
} }

@ -17,6 +17,7 @@
package trie package trie
import ( import (
"errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -24,6 +25,10 @@ import (
"gopkg.in/karalabe/cookiejar.v2/collections/prque" "gopkg.in/karalabe/cookiejar.v2/collections/prque"
) )
// ErrNotRequested is returned by the trie sync when it's requested to process a
// node it did not request.
var ErrNotRequested = errors.New("not requested")
// request represents a scheduled or already in-flight state retrieval request. // request represents a scheduled or already in-flight state retrieval request.
type request struct { type request struct {
hash common.Hash // Hash of the node data content to retrieve hash common.Hash // Hash of the node data content to retrieve
@ -144,7 +149,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) {
// If the item was not requested, bail out // If the item was not requested, bail out
request := s.requests[item.Hash] request := s.requests[item.Hash]
if request == nil { if request == nil {
return i, fmt.Errorf("not requested: %x", item.Hash) return i, ErrNotRequested
} }
// If the item is a raw entry request, commit directly // If the item is a raw entry request, commit directly
if request.object == nil { if request.object == nil {