core: use finalized block as the chain freeze indicator (#28683)
* core: use finalized block as the chain freeze indicator * core/rawdb: use max(finality, head-90k) as chain freezing threshold * core/rawdb: fix tests * core/rawdb: fix lint * core/rawdb: address comments from peter * core/rawdb: fix typo
This commit is contained in:
parent
a97d622588
commit
ca473b81cb
@ -1757,7 +1757,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
|
|||||||
|
|
||||||
func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme string) {
|
func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme string) {
|
||||||
// It's hard to follow the test case, visualize the input
|
// It's hard to follow the test case, visualize the input
|
||||||
//log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
|
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
|
||||||
// fmt.Println(tt.dump(true))
|
// fmt.Println(tt.dump(true))
|
||||||
|
|
||||||
// Create a temporary persistent database
|
// Create a temporary persistent database
|
||||||
@ -1830,10 +1830,14 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
|
|||||||
}
|
}
|
||||||
// Force run a freeze cycle
|
// Force run a freeze cycle
|
||||||
type freezer interface {
|
type freezer interface {
|
||||||
Freeze(threshold uint64) error
|
Freeze() error
|
||||||
Ancients() (uint64, error)
|
Ancients() (uint64, error)
|
||||||
}
|
}
|
||||||
db.(freezer).Freeze(tt.freezeThreshold)
|
if tt.freezeThreshold < uint64(tt.canonicalBlocks) {
|
||||||
|
final := uint64(tt.canonicalBlocks) - tt.freezeThreshold
|
||||||
|
chain.SetFinalized(canonblocks[int(final)-1].Header())
|
||||||
|
}
|
||||||
|
db.(freezer).Freeze()
|
||||||
|
|
||||||
// Set the simulated pivot block
|
// Set the simulated pivot block
|
||||||
if tt.pivotBlock != nil {
|
if tt.pivotBlock != nil {
|
||||||
|
@ -2044,10 +2044,14 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
|
|||||||
|
|
||||||
// Force run a freeze cycle
|
// Force run a freeze cycle
|
||||||
type freezer interface {
|
type freezer interface {
|
||||||
Freeze(threshold uint64) error
|
Freeze() error
|
||||||
Ancients() (uint64, error)
|
Ancients() (uint64, error)
|
||||||
}
|
}
|
||||||
db.(freezer).Freeze(tt.freezeThreshold)
|
if tt.freezeThreshold < uint64(tt.canonicalBlocks) {
|
||||||
|
final := uint64(tt.canonicalBlocks) - tt.freezeThreshold
|
||||||
|
chain.SetFinalized(canonblocks[int(final)-1].Header())
|
||||||
|
}
|
||||||
|
db.(freezer).Freeze()
|
||||||
|
|
||||||
// Set the simulated pivot block
|
// Set the simulated pivot block
|
||||||
if tt.pivotBlock != nil {
|
if tt.pivotBlock != nil {
|
||||||
|
@ -17,9 +17,9 @@
|
|||||||
package rawdb
|
package rawdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
@ -43,8 +43,6 @@ const (
|
|||||||
// The background thread will keep moving ancient chain segments from key-value
|
// The background thread will keep moving ancient chain segments from key-value
|
||||||
// database to flat files for saving space on live database.
|
// database to flat files for saving space on live database.
|
||||||
type chainFreezer struct {
|
type chainFreezer struct {
|
||||||
threshold atomic.Uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
|
|
||||||
|
|
||||||
*Freezer
|
*Freezer
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
@ -57,13 +55,11 @@ func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFre
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cf := chainFreezer{
|
return &chainFreezer{
|
||||||
Freezer: freezer,
|
Freezer: freezer,
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
trigger: make(chan chan struct{}),
|
trigger: make(chan chan struct{}),
|
||||||
}
|
}, nil
|
||||||
cf.threshold.Store(params.FullImmutabilityThreshold)
|
|
||||||
return &cf, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the chain freezer instance and terminates the background thread.
|
// Close closes the chain freezer instance and terminates the background thread.
|
||||||
@ -77,6 +73,57 @@ func (f *chainFreezer) Close() error {
|
|||||||
return f.Freezer.Close()
|
return f.Freezer.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// readHeadNumber returns the number of chain head block. 0 is returned if the
|
||||||
|
// block is unknown or not available yet.
|
||||||
|
func (f *chainFreezer) readHeadNumber(db ethdb.KeyValueReader) uint64 {
|
||||||
|
hash := ReadHeadBlockHash(db)
|
||||||
|
if hash == (common.Hash{}) {
|
||||||
|
log.Error("Head block is not reachable")
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
number := ReadHeaderNumber(db, hash)
|
||||||
|
if number == nil {
|
||||||
|
log.Error("Number of head block is missing")
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return *number
|
||||||
|
}
|
||||||
|
|
||||||
|
// readFinalizedNumber returns the number of finalized block. 0 is returned
|
||||||
|
// if the block is unknown or not available yet.
|
||||||
|
func (f *chainFreezer) readFinalizedNumber(db ethdb.KeyValueReader) uint64 {
|
||||||
|
hash := ReadFinalizedBlockHash(db)
|
||||||
|
if hash == (common.Hash{}) {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
number := ReadHeaderNumber(db, hash)
|
||||||
|
if number == nil {
|
||||||
|
log.Error("Number of finalized block is missing")
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return *number
|
||||||
|
}
|
||||||
|
|
||||||
|
// freezeThreshold returns the threshold for chain freezing. It's determined
|
||||||
|
// by formula: max(finality, HEAD-params.FullImmutabilityThreshold).
|
||||||
|
func (f *chainFreezer) freezeThreshold(db ethdb.KeyValueReader) (uint64, error) {
|
||||||
|
var (
|
||||||
|
head = f.readHeadNumber(db)
|
||||||
|
final = f.readFinalizedNumber(db)
|
||||||
|
headLimit uint64
|
||||||
|
)
|
||||||
|
if head > params.FullImmutabilityThreshold {
|
||||||
|
headLimit = head - params.FullImmutabilityThreshold
|
||||||
|
}
|
||||||
|
if final == 0 && headLimit == 0 {
|
||||||
|
return 0, errors.New("freezing threshold is not available")
|
||||||
|
}
|
||||||
|
if final > headLimit {
|
||||||
|
return final, nil
|
||||||
|
}
|
||||||
|
return headLimit, nil
|
||||||
|
}
|
||||||
|
|
||||||
// freeze is a background thread that periodically checks the blockchain for any
|
// freeze is a background thread that periodically checks the blockchain for any
|
||||||
// import progress and moves ancient data from the fast database into the freezer.
|
// import progress and moves ancient data from the fast database into the freezer.
|
||||||
//
|
//
|
||||||
@ -114,60 +161,39 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Retrieve the freezing threshold.
|
threshold, err := f.freezeThreshold(nfdb)
|
||||||
hash := ReadHeadBlockHash(nfdb)
|
if err != nil {
|
||||||
if hash == (common.Hash{}) {
|
|
||||||
log.Debug("Current full block hash unavailable") // new chain, empty database
|
|
||||||
backoff = true
|
backoff = true
|
||||||
|
log.Debug("Current full block not old enough to freeze", "err", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
number := ReadHeaderNumber(nfdb, hash)
|
|
||||||
threshold := f.threshold.Load()
|
|
||||||
frozen := f.frozen.Load()
|
frozen := f.frozen.Load()
|
||||||
switch {
|
|
||||||
case number == nil:
|
|
||||||
log.Error("Current full block number unavailable", "hash", hash)
|
|
||||||
backoff = true
|
|
||||||
continue
|
|
||||||
|
|
||||||
case *number < threshold:
|
// Short circuit if the blocks below threshold are already frozen.
|
||||||
log.Debug("Current full block not old enough to freeze", "number", *number, "hash", hash, "delay", threshold)
|
if frozen != 0 && frozen-1 >= threshold {
|
||||||
backoff = true
|
|
||||||
continue
|
|
||||||
|
|
||||||
case *number-threshold <= frozen:
|
|
||||||
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", frozen)
|
|
||||||
backoff = true
|
backoff = true
|
||||||
|
log.Debug("Ancient blocks frozen already", "threshold", threshold, "frozen", frozen)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
head := ReadHeader(nfdb, hash, *number)
|
|
||||||
if head == nil {
|
|
||||||
log.Error("Current full block unavailable", "number", *number, "hash", hash)
|
|
||||||
backoff = true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Seems we have data ready to be frozen, process in usable batches
|
// Seems we have data ready to be frozen, process in usable batches
|
||||||
var (
|
var (
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
first, _ = f.Ancients()
|
first = frozen // the first block to freeze
|
||||||
limit = *number - threshold
|
last = threshold // the last block to freeze
|
||||||
)
|
)
|
||||||
if limit-first > freezerBatchLimit {
|
if last-first+1 > freezerBatchLimit {
|
||||||
limit = first + freezerBatchLimit
|
last = freezerBatchLimit + first - 1
|
||||||
}
|
}
|
||||||
ancients, err := f.freezeRange(nfdb, first, limit)
|
ancients, err := f.freezeRange(nfdb, first, last)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error in block freeze operation", "err", err)
|
log.Error("Error in block freeze operation", "err", err)
|
||||||
backoff = true
|
backoff = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Batch of blocks have been frozen, flush them before wiping from leveldb
|
// Batch of blocks have been frozen, flush them before wiping from leveldb
|
||||||
if err := f.Sync(); err != nil {
|
if err := f.Sync(); err != nil {
|
||||||
log.Crit("Failed to flush frozen tables", "err", err)
|
log.Crit("Failed to flush frozen tables", "err", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wipe out all data from the active database
|
// Wipe out all data from the active database
|
||||||
batch := db.NewBatch()
|
batch := db.NewBatch()
|
||||||
for i := 0; i < len(ancients); i++ {
|
for i := 0; i < len(ancients); i++ {
|
||||||
@ -250,8 +276,11 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// freezeRange moves a batch of chain segments from the fast database to the freezer.
|
||||||
|
// The parameters (number, limit) specify the relevant block range, both of which
|
||||||
|
// are included.
|
||||||
func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
|
func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
|
||||||
hashes = make([]common.Hash, 0, limit-number)
|
hashes = make([]common.Hash, 0, limit-number+1)
|
||||||
|
|
||||||
_, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
|
_, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
|
||||||
for ; number <= limit; number++ {
|
for ; number <= limit; number++ {
|
||||||
@ -293,11 +322,9 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
|
|||||||
if err := op.AppendRaw(ChainFreezerDifficultyTable, number, td); err != nil {
|
if err := op.AppendRaw(ChainFreezerDifficultyTable, number, td); err != nil {
|
||||||
return fmt.Errorf("can't write td to Freezer: %v", err)
|
return fmt.Errorf("can't write td to Freezer: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
hashes = append(hashes, hash)
|
hashes = append(hashes, hash)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return hashes, err
|
return hashes, err
|
||||||
}
|
}
|
||||||
|
@ -66,16 +66,10 @@ func (frdb *freezerdb) Close() error {
|
|||||||
// Freeze is a helper method used for external testing to trigger and block until
|
// Freeze is a helper method used for external testing to trigger and block until
|
||||||
// a freeze cycle completes, without having to sleep for a minute to trigger the
|
// a freeze cycle completes, without having to sleep for a minute to trigger the
|
||||||
// automatic background run.
|
// automatic background run.
|
||||||
func (frdb *freezerdb) Freeze(threshold uint64) error {
|
func (frdb *freezerdb) Freeze() error {
|
||||||
if frdb.AncientStore.(*chainFreezer).readonly {
|
if frdb.AncientStore.(*chainFreezer).readonly {
|
||||||
return errReadOnly
|
return errReadOnly
|
||||||
}
|
}
|
||||||
// Set the freezer threshold to a temporary value
|
|
||||||
defer func(old uint64) {
|
|
||||||
frdb.AncientStore.(*chainFreezer).threshold.Store(old)
|
|
||||||
}(frdb.AncientStore.(*chainFreezer).threshold.Load())
|
|
||||||
frdb.AncientStore.(*chainFreezer).threshold.Store(threshold)
|
|
||||||
|
|
||||||
// Trigger a freeze cycle and block until it's done
|
// Trigger a freeze cycle and block until it's done
|
||||||
trigger := make(chan struct{}, 1)
|
trigger := make(chan struct{}, 1)
|
||||||
frdb.AncientStore.(*chainFreezer).trigger <- trigger
|
frdb.AncientStore.(*chainFreezer).trigger <- trigger
|
||||||
|
Loading…
Reference in New Issue
Block a user