cosensus, core, eth, params, trie: fixes + clique history cap

This commit is contained in:
Péter Szilágyi 2019-05-15 14:33:33 +03:00
parent 37d280da41
commit 536b3b416c
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D
12 changed files with 112 additions and 53 deletions

@ -365,8 +365,11 @@ func (c *Clique) snapshot(chain consensus.ChainReader, number uint64, hash commo
break break
} }
} }
// If we're at an checkpoint block, make a snapshot if it's known // If we're at the genesis, snapshot the initial state. Alternatively if we're
if number == 0 || (number%c.config.Epoch == 0 && chain.GetHeaderByNumber(number-1) == nil) { // at a checkpoint block without a parent (light client CHT), or we have piled
// up more headers than allowed to be reorged (chain reinit from a freezer),
// consider the checkpoint trusted and snapshot it.
if number == 0 || (number%c.config.Epoch == 0 && (len(headers) > params.ImmutabilityThreshold || chain.GetHeaderByNumber(number-1) == nil)) {
checkpoint := chain.GetHeaderByNumber(number) checkpoint := chain.GetHeaderByNumber(number)
if checkpoint != nil { if checkpoint != nil {
hash := checkpoint.Hash() hash := checkpoint.Hash()

@ -20,10 +20,12 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"sort" "sort"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
) )
@ -197,7 +199,11 @@ func (s *Snapshot) apply(headers []*types.Header) (*Snapshot, error) {
// Iterate through the headers and create a new snapshot // Iterate through the headers and create a new snapshot
snap := s.copy() snap := s.copy()
for _, header := range headers { var (
start = time.Now()
logged = time.Now()
)
for i, header := range headers {
// Remove any votes on checkpoint blocks // Remove any votes on checkpoint blocks
number := header.Number.Uint64() number := header.Number.Uint64()
if number%s.config.Epoch == 0 { if number%s.config.Epoch == 0 {
@ -285,6 +291,14 @@ func (s *Snapshot) apply(headers []*types.Header) (*Snapshot, error) {
} }
delete(snap.Tally, header.Coinbase) delete(snap.Tally, header.Coinbase)
} }
// If we're taking too much time (ecrecover), notify the user once a while
if time.Since(logged) > 8*time.Second {
log.Info("Reconstructing voting history", "processed", i, "total", len(headers), "elapsed", common.PrettyDuration(time.Since(start)))
logged = time.Now()
}
}
if time.Since(start) > 8*time.Second {
log.Info("Reconstructed voting history", "processed", len(headers), "elapsed", common.PrettyDuration(time.Since(start)))
} }
snap.Number += uint64(len(headers)) snap.Number += uint64(len(headers))
snap.Hash = headers[len(headers)-1].Hash() snap.Hash = headers[len(headers)-1].Hash()

@ -270,7 +270,7 @@ func (ethash *Ethash) remote(notify []string, noverify bool) {
start := time.Now() start := time.Now()
if !noverify { if !noverify {
if err := ethash.verifySeal(nil, header, true); err != nil { if err := ethash.verifySeal(nil, header, true); err != nil {
log.Warn("Invalid proof-of-work submitted", "sealhash", sealhash, "elapsed", time.Since(start), "err", err) log.Warn("Invalid proof-of-work submitted", "sealhash", sealhash, "elapsed", common.PrettyDuration(time.Since(start)), "err", err)
return false return false
} }
} }
@ -279,7 +279,7 @@ func (ethash *Ethash) remote(notify []string, noverify bool) {
log.Warn("Ethash result channel is empty, submitted mining result is rejected") log.Warn("Ethash result channel is empty, submitted mining result is rejected")
return false return false
} }
log.Trace("Verified correct proof-of-work", "sealhash", sealhash, "elapsed", time.Since(start)) log.Trace("Verified correct proof-of-work", "sealhash", sealhash, "elapsed", common.PrettyDuration(time.Since(start)))
// Solutions seems to be valid, return to the miner and notify acceptance. // Solutions seems to be valid, return to the miner and notify acceptance.
solution := block.WithSeal(header) solution := block.WithSeal(header)

@ -221,6 +221,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// Initialize the chain with ancient data if it isn't empty. // Initialize the chain with ancient data if it isn't empty.
if bc.empty() { if bc.empty() {
if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
var (
start = time.Now()
logged time.Time
)
for i := uint64(0); i < frozen; i++ { for i := uint64(0); i < frozen; i++ {
// Inject hash<->number mapping. // Inject hash<->number mapping.
hash := rawdb.ReadCanonicalHash(bc.db, i) hash := rawdb.ReadCanonicalHash(bc.db, i)
@ -235,12 +239,23 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
return nil, errors.New("broken ancient database") return nil, errors.New("broken ancient database")
} }
rawdb.WriteTxLookupEntries(bc.db, block) rawdb.WriteTxLookupEntries(bc.db, block)
// If we've spent too much time already, notify the user of what we're doing
if time.Since(logged) > 8*time.Second {
log.Info("Initializing chain from ancient data", "number", i, "hash", hash, "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start)))
logged = time.Now()
}
} }
hash := rawdb.ReadCanonicalHash(bc.db, frozen-1) hash := rawdb.ReadCanonicalHash(bc.db, frozen-1)
rawdb.WriteHeadHeaderHash(bc.db, hash) rawdb.WriteHeadHeaderHash(bc.db, hash)
rawdb.WriteHeadFastBlockHash(bc.db, hash) rawdb.WriteHeadFastBlockHash(bc.db, hash)
log.Info("Initialized chain with ancients", "number", frozen-1, "hash", hash) // The first thing the node will do is reconstruct the verification data for
// the head block (ethash cache or clique voting snapshot). Might as well do
// it in advance.
bc.engine.VerifyHeader(bc, rawdb.ReadHeader(bc.db, hash, frozen-1), true)
log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
} }
} }
if err := bc.loadLastState(); err != nil { if err := bc.loadLastState(); err != nil {

@ -190,6 +190,7 @@ func InspectDatabase(db ethdb.Database) error {
txlookupSize common.StorageSize txlookupSize common.StorageSize
preimageSize common.StorageSize preimageSize common.StorageSize
bloomBitsSize common.StorageSize bloomBitsSize common.StorageSize
cliqueSnapsSize common.StorageSize
// Ancient store statistics // Ancient store statistics
ancientHeaders common.StorageSize ancientHeaders common.StorageSize
@ -199,8 +200,12 @@ func InspectDatabase(db ethdb.Database) error {
ancientTds common.StorageSize ancientTds common.StorageSize
// Les statistic // Les statistic
ChtTrieNodes common.StorageSize chtTrieNodes common.StorageSize
BloomTrieNodes common.StorageSize bloomTrieNodes common.StorageSize
// Meta- and unaccounted data
metadata common.StorageSize
unaccounted common.StorageSize
) )
// Inspect key-value database first. // Inspect key-value database first.
for it.Next() { for it.Next() {
@ -228,12 +233,26 @@ func InspectDatabase(db ethdb.Database) error {
preimageSize += size preimageSize += size
case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength): case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength):
bloomBitsSize += size bloomBitsSize += size
case bytes.HasPrefix(key, []byte("clique-")) && len(key) == 7+common.HashLength:
cliqueSnapsSize += size
case bytes.HasPrefix(key, []byte("cht-")) && len(key) == 4+common.HashLength: case bytes.HasPrefix(key, []byte("cht-")) && len(key) == 4+common.HashLength:
ChtTrieNodes += size chtTrieNodes += size
case bytes.HasPrefix(key, []byte("blt-")) && len(key) == 4+common.HashLength: case bytes.HasPrefix(key, []byte("blt-")) && len(key) == 4+common.HashLength:
BloomTrieNodes += size bloomTrieNodes += size
case len(key) == common.HashLength: case len(key) == common.HashLength:
trieSize += size trieSize += size
default:
var accounted bool
for _, meta := range [][]byte{databaseVerisionKey, headHeaderKey, headBlockKey, headFastBlockKey, fastTrieProgressKey, ancientKey} {
if bytes.Equal(key, meta) {
metadata += size
accounted = true
break
}
}
if !accounted {
unaccounted += size
}
} }
count += 1 count += 1
if count%1000 == 0 && time.Since(logged) > 8*time.Second { if count%1000 == 0 && time.Since(logged) > 8*time.Second {
@ -261,18 +280,24 @@ func InspectDatabase(db ethdb.Database) error {
{"Key-Value store", "Bloombit index", bloomBitsSize.String()}, {"Key-Value store", "Bloombit index", bloomBitsSize.String()},
{"Key-Value store", "Trie nodes", trieSize.String()}, {"Key-Value store", "Trie nodes", trieSize.String()},
{"Key-Value store", "Trie preimages", preimageSize.String()}, {"Key-Value store", "Trie preimages", preimageSize.String()},
{"Key-Value store", "Clique snapshots", cliqueSnapsSize.String()},
{"Key-Value store", "Singleton metadata", metadata.String()},
{"Ancient store", "Headers", ancientHeaders.String()}, {"Ancient store", "Headers", ancientHeaders.String()},
{"Ancient store", "Bodies", ancientBodies.String()}, {"Ancient store", "Bodies", ancientBodies.String()},
{"Ancient store", "Receipts", ancientReceipts.String()}, {"Ancient store", "Receipts", ancientReceipts.String()},
{"Ancient store", "Difficulties", ancientTds.String()}, {"Ancient store", "Difficulties", ancientTds.String()},
{"Ancient store", "Block number->hash", ancientHashes.String()}, {"Ancient store", "Block number->hash", ancientHashes.String()},
{"Light client", "CHT trie nodes", ChtTrieNodes.String()}, {"Light client", "CHT trie nodes", chtTrieNodes.String()},
{"Light client", "Bloom trie nodes", BloomTrieNodes.String()}, {"Light client", "Bloom trie nodes", bloomTrieNodes.String()},
} }
table := tablewriter.NewWriter(os.Stdout) table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"Database", "Category", "Size"}) table.SetHeader([]string{"Database", "Category", "Size"})
table.SetFooter([]string{"", "Total", total.String()}) table.SetFooter([]string{"", "Total", total.String()})
table.AppendBulk(stats) table.AppendBulk(stats)
table.Render() table.Render()
if unaccounted > 0 {
log.Error("Database contains unaccounted data", "size", unaccounted)
}
return nil return nil
} }

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
) )
@ -52,11 +53,6 @@ const (
// storage. // storage.
freezerRecheckInterval = time.Minute freezerRecheckInterval = time.Minute
// freezerBlockGraduation is the number of confirmations a block must achieve
// before it becomes elligible for chain freezing. This must exceed any chain
// reorg depth, since the freezer also deletes all block siblings.
freezerBlockGraduation = 90000
// freezerBatchLimit is the maximum number of blocks to freeze in one batch // freezerBatchLimit is the maximum number of blocks to freeze in one batch
// before doing an fsync and deleting it from the key-value store. // before doing an fsync and deleting it from the key-value store.
freezerBatchLimit = 30000 freezerBatchLimit = 30000
@ -268,12 +264,12 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
time.Sleep(freezerRecheckInterval) time.Sleep(freezerRecheckInterval)
continue continue
case *number < freezerBlockGraduation: case *number < params.ImmutabilityThreshold:
log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", freezerBlockGraduation) log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold)
time.Sleep(freezerRecheckInterval) time.Sleep(freezerRecheckInterval)
continue continue
case *number-freezerBlockGraduation <= f.frozen: case *number-params.ImmutabilityThreshold <= f.frozen:
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen) log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen)
time.Sleep(freezerRecheckInterval) time.Sleep(freezerRecheckInterval)
continue continue
@ -285,7 +281,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
continue continue
} }
// Seems we have data ready to be frozen, process in usable batches // Seems we have data ready to be frozen, process in usable batches
limit := *number - freezerBlockGraduation limit := *number - params.ImmutabilityThreshold
if limit-f.frozen > freezerBatchLimit { if limit-f.frozen > freezerBatchLimit {
limit = f.frozen + freezerBatchLimit limit = f.frozen + freezerBatchLimit
} }

@ -35,7 +35,7 @@ func init() {
// Gets a chunk of data, filled with 'b' // Gets a chunk of data, filled with 'b'
func getChunk(size int, b int) []byte { func getChunk(size int, b int) []byte {
data := make([]byte, size) data := make([]byte, size)
for i, _ := range data { for i := range data {
data[i] = byte(b) data[i] = byte(b)
} }
return data return data

@ -25,7 +25,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -46,7 +46,6 @@ var (
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request
MaxForkAncestry = 3 * params.EpochDuration // Maximum chain reorganisation
rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests
rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
@ -60,6 +59,7 @@ var (
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain maxResultsProcess = 2048 // Number of content download results to import at once into the chain
maxForkAncestry uint64 = params.ImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
@ -439,7 +439,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode) log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
defer func(start time.Time) { defer func(start time.Time) {
log.Debug("Synchronisation terminated", "elapsed", time.Since(start)) log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
}(time.Now()) }(time.Now())
// Look up the sync boundaries: the common ancestor and the target block // Look up the sync boundaries: the common ancestor and the target block
@ -491,10 +491,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// The peer would start to feed us valid blocks until head, resulting in all of // The peer would start to feed us valid blocks until head, resulting in all of
// the blocks might be written into the ancient store. A following mini-reorg // the blocks might be written into the ancient store. A following mini-reorg
// could cause issues. // could cause issues.
if d.checkpoint != 0 && d.checkpoint > MaxForkAncestry+1 { if d.checkpoint != 0 && d.checkpoint > maxForkAncestry+1 {
d.ancientLimit = d.checkpoint d.ancientLimit = d.checkpoint
} else if height > MaxForkAncestry+1 { } else if height > maxForkAncestry+1 {
d.ancientLimit = height - MaxForkAncestry - 1 d.ancientLimit = height - maxForkAncestry - 1
} }
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
// If a part of blockchain data has already been written into active store, // If a part of blockchain data has already been written into active store,
@ -725,9 +725,9 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight) p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)
// Recap floor value for binary search // Recap floor value for binary search
if localHeight >= MaxForkAncestry { if localHeight >= maxForkAncestry {
// We're above the max reorg threshold, find the earliest fork point // We're above the max reorg threshold, find the earliest fork point
floor = int64(localHeight - MaxForkAncestry) floor = int64(localHeight - maxForkAncestry)
} }
// If we're doing a light sync, ensure the floor doesn't go below the CHT, as // If we're doing a light sync, ensure the floor doesn't go below the CHT, as
// all headers before that point will be missing. // all headers before that point will be missing.

@ -37,7 +37,7 @@ import (
// Reduce some of the parameters to make the tester faster. // Reduce some of the parameters to make the tester faster.
func init() { func init() {
MaxForkAncestry = uint64(10000) maxForkAncestry = 10000
blockCacheItems = 1024 blockCacheItems = 1024
fsHeaderContCheck = 500 * time.Millisecond fsHeaderContCheck = 500 * time.Millisecond
} }

@ -45,7 +45,7 @@ var testChainBase = newTestChain(blockCacheItems+200, testGenesis)
var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain
func init() { func init() {
var forkLen = int(MaxForkAncestry + 50) var forkLen = int(maxForkAncestry + 50)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(3) wg.Add(3)
go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }() go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }()

@ -46,4 +46,10 @@ const (
// HelperTrieProcessConfirmations is the number of confirmations before a HelperTrie // HelperTrieProcessConfirmations is the number of confirmations before a HelperTrie
// is generated // is generated
HelperTrieProcessConfirmations = 256 HelperTrieProcessConfirmations = 256
// ImmutabilityThreshold is the number of blocks after which a chain segment is
// considered immutable (i.e. soft finality). It is used by the downloader as a
// hard limit against deep ancestors, by the blockchain against deep reorgs, by
// the freezer as the cutoff treshold and by clique as the snapshot trust limit.
ImmutabilityThreshold = 90000
) )

@ -118,14 +118,14 @@ func (b *SyncBloom) init(database ethdb.Iteratee) {
it.Release() it.Release()
it = database.NewIteratorWithStart(key) it = database.NewIteratorWithStart(key)
log.Info("Initializing fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", time.Since(start)) log.Info("Initializing fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", common.PrettyDuration(time.Since(start)))
swap = time.Now() swap = time.Now()
} }
} }
it.Release() it.Release()
// Mark the bloom filter inited and return // Mark the bloom filter inited and return
log.Info("Initialized fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", time.Since(start)) log.Info("Initialized fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", common.PrettyDuration(time.Since(start)))
atomic.StoreUint32(&b.inited, 1) atomic.StoreUint32(&b.inited, 1)
} }