[R4R]offline block prune (#543)

* offline block prune

* update

* update

* update and add unit test

* addressed comments from walt

* Addressed comments from walt and Igor

* ensure MPT and snapshot matched

* add one more parameter to indicate blockprune

* update the logic of creating freezerDb

* update flag command description

* expose the function for db inspect the offset/startBlockNumber

* add flags to inspect prune info

* rename flag of reserved-recent-blocks to block-amount-reserved

* addressed comments from walt

* handle the case of command interruption

* refined goimports

* addressed comments from walt

* change the logic as restarting prune after interruption

* addressed comments

* reclaimed freezer logic

* introduce flag to enable/disable check between MPT and snapshot

* update the logic of frozen field in freezerDB

* update the code in all places related to freezer change

* addressed comments from dylan

* update the logic for backup block difficulty

* addressed comments from dylan
This commit is contained in:
John 2022-01-19 18:07:49 +08:00 committed by GitHub
parent ebc39330c5
commit 476d5200f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 879 additions and 65 deletions

2
.gitignore vendored

@ -48,3 +48,5 @@ profile.cov
/dashboard/assets/package-lock.json
**/yarn-error.log
cmd/geth/node/
cmd/geth/__debug_bin

@ -458,7 +458,7 @@ func importPreimages(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, false)
db := utils.MakeChainDatabase(ctx, stack, false, false)
start := time.Now()
if err := utils.ImportPreimages(db, ctx.Args().First()); err != nil {
@ -477,7 +477,7 @@ func exportPreimages(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, true)
db := utils.MakeChainDatabase(ctx, stack, true, false)
start := time.Now()
if err := utils.ExportPreimages(db, ctx.Args().First()); err != nil {
@ -491,7 +491,7 @@ func dump(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, true)
db := utils.MakeChainDatabase(ctx, stack, true, false)
for _, arg := range ctx.Args() {
var header *types.Header
if hashish(arg) {

@ -62,6 +62,7 @@ Remove blockchain and state databases`,
dbPutCmd,
dbGetSlotsCmd,
dbDumpFreezerIndex,
ancientInspectCmd,
},
}
dbInspectCmd = cli.Command{
@ -195,6 +196,16 @@ WARNING: This is a low-level operation which may cause database corruption!`,
},
Description: "This command displays information about the freezer index.",
}
ancientInspectCmd = cli.Command{
Action: utils.MigrateFlags(ancientInspect),
Name: "inspect-reserved-oldest-blocks",
Flags: []cli.Flag{
utils.DataDirFlag,
},
Usage: "Inspect the ancientStore information",
Description: `This commands will read current offset from kvdb, which is the current offset and starting BlockNumber
of ancientStore, will also displays the reserved number of blocks in ancientStore `,
}
)
func removeDB(ctx *cli.Context) error {
@ -282,12 +293,21 @@ func inspect(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, true)
db := utils.MakeChainDatabase(ctx, stack, true, false)
defer db.Close()
return rawdb.InspectDatabase(db, prefix, start)
}
func ancientInspect(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, true, true)
defer db.Close()
return rawdb.AncientInspect(db)
}
func showLeveldbStats(db ethdb.Stater) {
if stats, err := db.Stat("leveldb.stats"); err != nil {
log.Warn("Failed to read database stats", "error", err)
@ -305,7 +325,7 @@ func dbStats(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, true)
db := utils.MakeChainDatabase(ctx, stack, true, false)
defer db.Close()
showLeveldbStats(db)
@ -316,7 +336,7 @@ func dbCompact(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, false)
db := utils.MakeChainDatabase(ctx, stack, false, false)
defer db.Close()
log.Info("Stats before compaction")
@ -340,7 +360,7 @@ func dbGet(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, true)
db := utils.MakeChainDatabase(ctx, stack, true, false)
defer db.Close()
key, err := hexutil.Decode(ctx.Args().Get(0))
@ -365,7 +385,7 @@ func dbDelete(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, false)
db := utils.MakeChainDatabase(ctx, stack, false, false)
defer db.Close()
key, err := hexutil.Decode(ctx.Args().Get(0))
@ -392,7 +412,7 @@ func dbPut(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, false)
db := utils.MakeChainDatabase(ctx, stack, false, false)
defer db.Close()
var (
@ -426,7 +446,7 @@ func dbDumpTrie(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, true)
db := utils.MakeChainDatabase(ctx, stack, true, false)
defer db.Close()
var (
root []byte

@ -165,6 +165,8 @@ var (
utils.MinerNotifyFullFlag,
configFileFlag,
utils.CatalystFlag,
utils.BlockAmountReserved,
utils.CheckSnapshotWithMPT,
}
rpcFlags = []cli.Flag{

242
cmd/geth/pruneblock_test.go Normal file

@ -0,0 +1,242 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package main
import (
"bytes"
"encoding/hex"
"fmt"
"io/ioutil"
"math/big"
"os"
"path/filepath"
"testing"
"time"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)
var (
canonicalSeed = 1
blockPruneBackUpBlockNumber = 128
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
balance = big.NewInt(10000000)
gspec = &core.Genesis{Config: params.TestChainConfig, Alloc: core.GenesisAlloc{address: {Balance: balance}}}
signer = types.LatestSigner(gspec.Config)
config = &core.CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 0, // Disable snapshot
TriesInMemory: 128,
}
engine = ethash.NewFullFaker()
)
func TestOfflineBlockPrune(t *testing.T) {
//Corner case for 0 remain in ancinetStore.
testOfflineBlockPruneWithAmountReserved(t, 0)
//General case.
testOfflineBlockPruneWithAmountReserved(t, 100)
}
func testOfflineBlockPruneWithAmountReserved(t *testing.T, amountReserved uint64) {
datadir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("Failed to create temporary datadir: %v", err)
}
os.RemoveAll(datadir)
chaindbPath := filepath.Join(datadir, "chaindata")
oldAncientPath := filepath.Join(chaindbPath, "ancient")
newAncientPath := filepath.Join(chaindbPath, "ancient_back")
db, blocks, blockList, receiptsList, externTdList, startBlockNumber, _ := BlockchainCreator(t, chaindbPath, oldAncientPath, amountReserved)
node, _ := startEthService(t, gspec, blocks, chaindbPath)
defer node.Close()
//Initialize a block pruner for pruning, only remain amountReserved blocks backward.
testBlockPruner := pruner.NewBlockPruner(db, node, oldAncientPath, newAncientPath, amountReserved)
if err != nil {
t.Fatalf("failed to make new blockpruner: %v", err)
}
if err := testBlockPruner.BlockPruneBackUp(chaindbPath, 512, utils.MakeDatabaseHandles(), "", false, false); err != nil {
t.Fatalf("Failed to back up block: %v", err)
}
dbBack, err := rawdb.NewLevelDBDatabaseWithFreezer(chaindbPath, 0, 0, newAncientPath, "", false, true, false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}
defer dbBack.Close()
//check against if the backup data matched original one
for blockNumber := startBlockNumber; blockNumber < startBlockNumber+amountReserved; blockNumber++ {
blockHash := rawdb.ReadCanonicalHash(dbBack, blockNumber)
block := rawdb.ReadBlock(dbBack, blockHash, blockNumber)
if block.Hash() != blockHash {
t.Fatalf("block data did not match between oldDb and backupDb")
}
if blockList[blockNumber-startBlockNumber].Hash() != blockHash {
t.Fatalf("block data did not match between oldDb and backupDb")
}
receipts := rawdb.ReadRawReceipts(dbBack, blockHash, blockNumber)
if err := checkReceiptsRLP(receipts, receiptsList[blockNumber-startBlockNumber]); err != nil {
t.Fatalf("receipts did not match between oldDb and backupDb")
}
// // Calculate the total difficulty of the block
td := rawdb.ReadTd(dbBack, blockHash, blockNumber)
if td == nil {
t.Fatalf("Failed to ReadTd: %v", consensus.ErrUnknownAncestor)
}
if td.Cmp(externTdList[blockNumber-startBlockNumber]) != 0 {
t.Fatalf("externTd did not match between oldDb and backupDb")
}
}
//check if ancientDb freezer replaced successfully
testBlockPruner.AncientDbReplacer()
if _, err := os.Stat(newAncientPath); err != nil {
if !os.IsNotExist(err) {
t.Fatalf("ancientDb replaced unsuccessfully")
}
}
if _, err := os.Stat(oldAncientPath); err != nil {
t.Fatalf("ancientDb replaced unsuccessfully")
}
}
func BlockchainCreator(t *testing.T, chaindbPath, AncientPath string, blockRemain uint64) (ethdb.Database, []*types.Block, []*types.Block, []types.Receipts, []*big.Int, uint64, *core.BlockChain) {
//create a database with ancient freezer
db, err := rawdb.NewLevelDBDatabaseWithFreezer(chaindbPath, 0, 0, AncientPath, "", false, false, false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}
defer db.Close()
genesis := gspec.MustCommit(db)
// Initialize a fresh chain with only a genesis block
blockchain, err := core.NewBlockChain(db, config, gspec.Config, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to create chain: %v", err)
}
// Make chain starting from genesis
blocks, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 500, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{0: byte(canonicalSeed), 19: byte(i)})
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, nil, nil), signer, key)
if err != nil {
panic(err)
}
block.AddTx(tx)
block.SetDifficulty(big.NewInt(1000000))
})
if _, err := blockchain.InsertChain(blocks); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
// Force run a freeze cycle
type freezer interface {
Freeze(threshold uint64) error
Ancients() (uint64, error)
}
db.(freezer).Freeze(10)
frozen, err := db.Ancients()
//make sure there're frozen items
if err != nil || frozen == 0 {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if frozen < blockRemain {
t.Fatalf("block amount is not enough for pruning: %v", err)
}
oldOffSet := rawdb.ReadOffSetOfCurrentAncientFreezer(db)
// Get the actual start block number.
startBlockNumber := frozen - blockRemain + oldOffSet
// Initialize the slice to buffer the block data left.
blockList := make([]*types.Block, 0, blockPruneBackUpBlockNumber)
receiptsList := make([]types.Receipts, 0, blockPruneBackUpBlockNumber)
externTdList := make([]*big.Int, 0, blockPruneBackUpBlockNumber)
// All ancient data within the most recent 128 blocks write into memory buffer for future new ancient_back directory usage.
for blockNumber := startBlockNumber; blockNumber < frozen+oldOffSet; blockNumber++ {
blockHash := rawdb.ReadCanonicalHash(db, blockNumber)
block := rawdb.ReadBlock(db, blockHash, blockNumber)
blockList = append(blockList, block)
receipts := rawdb.ReadRawReceipts(db, blockHash, blockNumber)
receiptsList = append(receiptsList, receipts)
// Calculate the total difficulty of the block
td := rawdb.ReadTd(db, blockHash, blockNumber)
if td == nil {
t.Fatalf("Failed to ReadTd: %v", consensus.ErrUnknownAncestor)
}
externTdList = append(externTdList, td)
}
return db, blocks, blockList, receiptsList, externTdList, startBlockNumber, blockchain
}
func checkReceiptsRLP(have, want types.Receipts) error {
if len(have) != len(want) {
return fmt.Errorf("receipts sizes mismatch: have %d, want %d", len(have), len(want))
}
for i := 0; i < len(want); i++ {
rlpHave, err := rlp.EncodeToBytes(have[i])
if err != nil {
return err
}
rlpWant, err := rlp.EncodeToBytes(want[i])
if err != nil {
return err
}
if !bytes.Equal(rlpHave, rlpWant) {
return fmt.Errorf("receipt #%d: receipt mismatch: have %s, want %s", i, hex.EncodeToString(rlpHave), hex.EncodeToString(rlpWant))
}
}
return nil
}
// startEthService creates a full node instance for testing.
func startEthService(t *testing.T, genesis *core.Genesis, blocks []*types.Block, chaindbPath string) (*node.Node, *eth.Ethereum) {
t.Helper()
n, err := node.New(&node.Config{DataDir: chaindbPath})
if err != nil {
t.Fatal("can't create node:", err)
}
if err := n.Start(); err != nil {
t.Fatal("can't start node:", err)
}
return n, nil
}

@ -19,8 +19,14 @@ package main
import (
"bytes"
"errors"
"fmt"
"os"
"path/filepath"
"time"
"github.com/prometheus/tsdb/fileutil"
cli "gopkg.in/urfave/cli.v1"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
@ -28,10 +34,11 @@ import (
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
cli "gopkg.in/urfave/cli.v1"
)
var (
@ -78,6 +85,30 @@ WARNING: It's necessary to delete the trie clean cache after the pruning.
If you specify another directory for the trie clean cache via "--cache.trie.journal"
during the use of Geth, please also specify it here for correct deletion. Otherwise
the trie clean cache with default directory will be deleted.
`,
},
{
Name: "prune-block",
Usage: "Prune block data offline",
Action: utils.MigrateFlags(pruneBlock),
Category: "MISCELLANEOUS COMMANDS",
Flags: []cli.Flag{
utils.DataDirFlag,
utils.AncientFlag,
utils.BlockAmountReserved,
utils.TriesInMemoryFlag,
utils.CheckSnapshotWithMPT,
},
Description: `
geth offline prune-block for block data in ancientdb.
The amount of blocks expected for remaining after prune can be specified via block-amount-reserved in this command,
will prune and only remain the specified amount of old block data in ancientdb.
the brief workflow is to backup the the number of this specified amount blocks backward in original ancientdb
into new ancient_backup, then delete the original ancientdb dir and rename the ancient_backup to original one for replacement,
finally assemble the statedb and new ancientDb together.
The purpose of doing it is because the block data will be moved into the ancient store when it
becomes old enough(exceed the Threshold 90000), the disk usage will be very large over time, and is occupied mainly by ancientDb,
so it's very necessary to do block data prune, this feature will handle it.
`,
},
{
@ -149,11 +180,164 @@ It's also usable without snapshot enabled.
}
)
func accessDb(ctx *cli.Context, stack *node.Node) (ethdb.Database, error) {
//The layer of tries trees that keep in memory.
TriesInMemory := int(ctx.GlobalUint64(utils.TriesInMemoryFlag.Name))
chaindb := utils.MakeChainDatabase(ctx, stack, false, true)
defer chaindb.Close()
if !ctx.GlobalBool(utils.CheckSnapshotWithMPT.Name) {
return chaindb, nil
}
headBlock := rawdb.ReadHeadBlock(chaindb)
if headBlock == nil {
return nil, errors.New("failed to load head block")
}
headHeader := headBlock.Header()
//Make sure the MPT and snapshot matches before pruning, otherwise the node can not start.
snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, TriesInMemory, headBlock.Root(), false, false, false)
if err != nil {
log.Error("snaptree error", "err", err)
return nil, err // The relevant snapshot(s) might not exist
}
// Use the HEAD-(n-1) as the target root. The reason for picking it is:
// - in most of the normal cases, the related state is available
// - the probability of this layer being reorg is very low
// Retrieve all snapshot layers from the current HEAD.
// In theory there are n difflayers + 1 disk layer present,
// so n diff layers are expected to be returned.
layers := snaptree.Snapshots(headHeader.Root, TriesInMemory, true)
if len(layers) != TriesInMemory {
// Reject if the accumulated diff layers are less than n. It
// means in most of normal cases, there is no associated state
// with bottom-most diff layer.
log.Error("snapshot layers != TriesInMemory", "err", err)
return nil, fmt.Errorf("snapshot not old enough yet: need %d more blocks", TriesInMemory-len(layers))
}
// Use the bottom-most diff layer as the target
targetRoot := layers[len(layers)-1].Root()
// Ensure the root is really present. The weak assumption
// is the presence of root can indicate the presence of the
// entire trie.
if blob := rawdb.ReadTrieNode(chaindb, targetRoot); len(blob) == 0 {
// The special case is for clique based networks(rinkeby, goerli
// and some other private networks), it's possible that two
// consecutive blocks will have same root. In this case snapshot
// difflayer won't be created. So HEAD-(n-1) may not paired with
// head-(n-1) layer. Instead the paired layer is higher than the
// bottom-most diff layer. Try to find the bottom-most snapshot
// layer with state available.
//
// Note HEAD is ignored. Usually there is the associated
// state available, but we don't want to use the topmost state
// as the pruning target.
var found bool
for i := len(layers) - 2; i >= 1; i-- {
if blob := rawdb.ReadTrieNode(chaindb, layers[i].Root()); len(blob) != 0 {
targetRoot = layers[i].Root()
found = true
log.Info("Selecting middle-layer as the pruning target", "root", targetRoot, "depth", i)
break
}
}
if !found {
if blob := rawdb.ReadTrieNode(chaindb, snaptree.DiskRoot()); len(blob) != 0 {
targetRoot = snaptree.DiskRoot()
found = true
log.Info("Selecting disk-layer as the pruning target", "root", targetRoot)
}
}
if !found {
if len(layers) > 0 {
log.Error("no snapshot paired state")
return nil, errors.New("no snapshot paired state")
}
return nil, fmt.Errorf("associated state[%x] is not present", targetRoot)
}
} else {
if len(layers) > 0 {
log.Info("Selecting bottom-most difflayer as the pruning target", "root", targetRoot, "height", headHeader.Number.Uint64()-uint64(len(layers)-1))
} else {
log.Info("Selecting user-specified state as the pruning target", "root", targetRoot)
}
}
return chaindb, nil
}
func pruneBlock(ctx *cli.Context) error {
stack, config := makeConfigNode(ctx)
defer stack.Close()
blockAmountReserved := ctx.GlobalUint64(utils.BlockAmountReserved.Name)
chaindb, err := accessDb(ctx, stack)
if err != nil {
return err
}
var newAncientPath string
oldAncientPath := ctx.GlobalString(utils.AncientFlag.Name)
if !filepath.IsAbs(oldAncientPath) {
oldAncientPath = stack.ResolvePath(oldAncientPath)
}
path, _ := filepath.Split(oldAncientPath)
if path == "" {
return errors.New("prune failed, did not specify the AncientPath")
}
newAncientPath = filepath.Join(path, "ancient_back")
blockpruner := pruner.NewBlockPruner(chaindb, stack, oldAncientPath, newAncientPath, blockAmountReserved)
lock, exist, err := fileutil.Flock(filepath.Join(oldAncientPath, "PRUNEFLOCK"))
if err != nil {
log.Error("file lock error", "err", err)
return err
}
if exist {
defer lock.Release()
log.Info("file lock existed, waiting for prune recovery and continue", "err", err)
if err := blockpruner.RecoverInterruption("chaindata", config.Eth.DatabaseCache, utils.MakeDatabaseHandles(), "", false); err != nil {
log.Error("Pruning failed", "err", err)
return err
}
log.Info("Block prune successfully")
return nil
}
if _, err := os.Stat(newAncientPath); err == nil {
// No file lock found for old ancientDB but new ancientDB exsisted, indicating the geth was interrupted
// after old ancientDB removal, this happened after backup successfully, so just rename the new ancientDB
if err := blockpruner.AncientDbReplacer(); err != nil {
log.Error("Failed to rename new ancient directory")
return err
}
log.Info("Block prune successfully")
return nil
}
name := "chaindata"
if err := blockpruner.BlockPruneBackUp(name, config.Eth.DatabaseCache, utils.MakeDatabaseHandles(), "", false, false); err != nil {
log.Error("Failed to back up block", "err", err)
return err
}
log.Info("backup block successfully")
//After backing up successfully, rename the new ancientdb name to the original one, and delete the old ancientdb
if err := blockpruner.AncientDbReplacer(); err != nil {
return err
}
lock.Release()
log.Info("Block prune successfully")
return nil
}
func pruneState(ctx *cli.Context) error {
stack, config := makeConfigNode(ctx)
defer stack.Close()
chaindb := utils.MakeChainDatabase(ctx, stack, false)
chaindb := utils.MakeChainDatabase(ctx, stack, false, false)
pruner, err := pruner.NewPruner(chaindb, stack.ResolvePath(""), stack.ResolvePath(config.Eth.TrieCleanCacheJournal), ctx.GlobalUint64(utils.BloomFilterSizeFlag.Name), ctx.GlobalUint64(utils.TriesInMemoryFlag.Name))
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
@ -182,7 +366,7 @@ func verifyState(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
chaindb := utils.MakeChainDatabase(ctx, stack, true)
chaindb := utils.MakeChainDatabase(ctx, stack, true, false)
headBlock := rawdb.ReadHeadBlock(chaindb)
if headBlock == nil {
log.Error("Failed to load head block")
@ -220,7 +404,7 @@ func traverseState(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
chaindb := utils.MakeChainDatabase(ctx, stack, true)
chaindb := utils.MakeChainDatabase(ctx, stack, true, false)
headBlock := rawdb.ReadHeadBlock(chaindb)
if headBlock == nil {
log.Error("Failed to load head block")
@ -310,7 +494,7 @@ func traverseRawState(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
chaindb := utils.MakeChainDatabase(ctx, stack, true)
chaindb := utils.MakeChainDatabase(ctx, stack, true, false)
headBlock := rawdb.ReadHeadBlock(chaindb)
if headBlock == nil {
log.Error("Failed to load head block")

@ -58,6 +58,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.LightKDFFlag,
utils.WhitelistFlag,
utils.TriesInMemoryFlag,
utils.BlockAmountReserved,
utils.CheckSnapshotWithMPT,
},
},
{

@ -33,6 +33,10 @@ import (
"text/template"
"time"
pcsclite "github.com/gballet/go-libpcsclite"
gopsutil "github.com/shirou/gopsutil/mem"
"gopkg.in/urfave/cli.v1"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
@ -66,9 +70,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/ethereum/go-ethereum/params"
pcsclite "github.com/gballet/go-libpcsclite"
gopsutil "github.com/shirou/gopsutil/mem"
"gopkg.in/urfave/cli.v1"
)
func init() {
@ -827,6 +828,16 @@ var (
Name: "catalyst",
Usage: "Catalyst mode (eth2 integration testing)",
}
BlockAmountReserved = cli.Uint64Flag{
Name: "block-amount-reserved",
Usage: "Sets the expected remained amount of blocks for offline block prune",
}
CheckSnapshotWithMPT = cli.BoolFlag{
Name: "check-snapshot-with-mpt",
Usage: "Enable checking between snapshot and MPT ",
}
)
// MakeDataDir retrieves the currently requested data directory, terminating
@ -1764,7 +1775,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(DataDirFlag.Name) {
// Check if we have an already initialized chain and fall back to
// that if so. Otherwise we need to generate a new genesis spec.
chaindb := MakeChainDatabase(ctx, stack, false) // TODO (MariusVanDerWijden) make this read only
chaindb := MakeChainDatabase(ctx, stack, false, false) // TODO (MariusVanDerWijden) make this read only
if rawdb.ReadCanonicalHash(chaindb, 0) != (common.Hash{}) {
cfg.Genesis = nil // fallback to db content
}
@ -1883,7 +1894,7 @@ func SplitTagsFlag(tagsFlag string) map[string]string {
}
// MakeChainDatabase open an LevelDB using the flags passed to the client and will hard crash if it fails.
func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly bool) ethdb.Database {
func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly, disableFreeze bool) ethdb.Database {
var (
cache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheDatabaseFlag.Name) / 100
handles = MakeDatabaseHandles()
@ -1896,7 +1907,7 @@ func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly bool) ethdb.
chainDb, err = stack.OpenDatabase(name, cache, handles, "", readonly)
} else {
name := "chaindata"
chainDb, err = stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "", readonly)
chainDb, err = stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "", readonly, disableFreeze, false)
}
if err != nil {
Fatalf("Could not open database: %v", err)
@ -1926,7 +1937,7 @@ func MakeGenesis(ctx *cli.Context) *core.Genesis {
// MakeChain creates a chain manager from set command line flags.
func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chainDb ethdb.Database) {
var err error
chainDb = MakeChainDatabase(ctx, stack, false) // TODO(rjl493456442) support read-only database
chainDb = MakeChainDatabase(ctx, stack, false, false) // TODO(rjl493456442) support read-only database
config, _, err := core.SetupGenesisBlock(chainDb, MakeGenesis(ctx))
if err != nil {
Fatalf("%v", err)

@ -325,9 +325,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
rawdb.InitDatabaseFromFreezer(bc.db)
// If ancient database is not empty, reconstruct all missing
// indices in the background.
frozen, _ := bc.db.Ancients()
frozen, _ := bc.db.ItemAmountInAncient()
if frozen > 0 {
txIndexBlock = frozen
txIndexBlock, _ = bc.db.Ancients()
}
}
if err := bc.loadLastState(); err != nil {
@ -362,7 +362,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
}
// Ensure that a previous crash in SetHead doesn't leave extra ancients
if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
if frozen, err := bc.db.ItemAmountInAncient(); err == nil && frozen > 0 {
frozen, err = bc.db.Ancients()
if err != nil {
return nil, err
}
var (
needRewind bool
low uint64

@ -1762,7 +1762,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
}
os.RemoveAll(datadir)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false)
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}
@ -1832,7 +1832,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
db.Close()
// Start a new blockchain back up and see where the repait leads us
db, err = rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false)
db, err = rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false)
if err != nil {
t.Fatalf("Failed to reopen persistent database: %v", err)
}

@ -1961,7 +1961,7 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) {
}
os.RemoveAll(datadir)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false)
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}

@ -64,7 +64,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
}
os.RemoveAll(datadir)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false)
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}
@ -248,7 +248,7 @@ func (snaptest *crashSnapshotTest) test(t *testing.T) {
db.Close()
// Start a new blockchain back up and see where the repair leads us
newdb, err := rawdb.NewLevelDBDatabaseWithFreezer(snaptest.datadir, 0, 0, snaptest.datadir, "", false)
newdb, err := rawdb.NewLevelDBDatabaseWithFreezer(snaptest.datadir, 0, 0, snaptest.datadir, "", false, false, false)
if err != nil {
t.Fatalf("Failed to reopen persistent database: %v", err)
}

@ -653,7 +653,7 @@ func TestFastVsFullChains(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -727,7 +727,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(dir)
db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false)
db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -1594,7 +1594,7 @@ func TestBlockchainRecovery(t *testing.T) {
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -1651,7 +1651,7 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -1850,7 +1850,7 @@ func testInsertKnownChainData(t *testing.T, typ string) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(dir)
chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false)
chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -2130,7 +2130,7 @@ func TestTransactionIndices(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -2158,7 +2158,7 @@ func TestTransactionIndices(t *testing.T) {
// Init block chain with external ancients, check all needed indices has been indexed.
limit := []uint64{0, 32, 64, 128}
for _, l := range limit {
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -2178,7 +2178,7 @@ func TestTransactionIndices(t *testing.T) {
}
// Reconstruct a block chain which only reserves HEAD-64 tx indices
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
@ -2257,7 +2257,7 @@ func TestSkipStaleTxIndicesInFastSync(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}

@ -440,7 +440,7 @@ func TestAncientStorage(t *testing.T) {
}
defer os.Remove(frdir)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}

@ -35,7 +35,7 @@ import (
// injects into the database the block hash->number mappings.
func InitDatabaseFromFreezer(db ethdb.Database) {
// If we can't access the freezer or it's empty, abort
frozen, err := db.Ancients()
frozen, err := db.ItemAmountInAncient()
if err != nil || frozen == 0 {
return
}
@ -44,8 +44,9 @@ func InitDatabaseFromFreezer(db ethdb.Database) {
start = time.Now()
logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
hash common.Hash
offset = db.AncientOffSet()
)
for i := uint64(0); i < frozen; i++ {
for i := uint64(0) + offset; i < frozen+offset; i++ {
// Since the freezer has all data in sequential order on a file,
// it would be 'neat' to read more data in one go, and let the
// freezerdb return N items (e.g up to 1000 items per go)

@ -20,16 +20,18 @@ import (
"bytes"
"errors"
"fmt"
"math/big"
"os"
"sync/atomic"
"time"
"github.com/olekukonko/tablewriter"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/leveldb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
"github.com/olekukonko/tablewriter"
)
// freezerdb is a database wrapper that enabled freezer data retrievals.
@ -112,6 +114,11 @@ func (db *nofreezedb) Ancients() (uint64, error) {
return 0, errNotSupported
}
// Ancients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) ItemAmountInAncient() (uint64, error) {
return 0, errNotSupported
}
// AncientSize returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
@ -140,6 +147,10 @@ func (db *nofreezedb) SetDiffStore(diff ethdb.KeyValueStore) {
db.diffStore = diff
}
func (db *nofreezedb) AncientOffSet() uint64 {
return 0
}
// NewDatabase creates a high level database on top of a given key-value data
// store without a freezer moving immutable chain segments into cold storage.
func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
@ -148,15 +159,69 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
}
}
func ReadOffSetOfCurrentAncientFreezer(db ethdb.KeyValueReader) uint64 {
offset, _ := db.Get(offSetOfCurrentAncientFreezer)
if offset == nil {
return 0
}
return new(big.Int).SetBytes(offset).Uint64()
}
func ReadOffSetOfLastAncientFreezer(db ethdb.KeyValueReader) uint64 {
offset, _ := db.Get(offSetOfLastAncientFreezer)
if offset == nil {
return 0
}
return new(big.Int).SetBytes(offset).Uint64()
}
func WriteOffSetOfCurrentAncientFreezer(db ethdb.KeyValueWriter, offset uint64) {
if err := db.Put(offSetOfCurrentAncientFreezer, new(big.Int).SetUint64(offset).Bytes()); err != nil {
log.Crit("Failed to store offSetOfAncientFreezer", "err", err)
}
}
func WriteOffSetOfLastAncientFreezer(db ethdb.KeyValueWriter, offset uint64) {
if err := db.Put(offSetOfLastAncientFreezer, new(big.Int).SetUint64(offset).Bytes()); err != nil {
log.Crit("Failed to store offSetOfAncientFreezer", "err", err)
}
}
// NewFreezerDb only create a freezer without statedb.
func NewFreezerDb(db ethdb.KeyValueStore, frz, namespace string, readonly bool, newOffSet uint64) (*freezer, error) {
// Create the idle freezer instance, this operation should be atomic to avoid mismatch between offset and acientDB.
frdb, err := newFreezer(frz, namespace, readonly)
if err != nil {
return nil, err
}
frdb.offset = newOffSet
frdb.frozen += newOffSet
return frdb, nil
}
// NewDatabaseWithFreezer creates a high level database on top of a given key-
// value data store with a freezer moving immutable chain segments into cold
// storage.
func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly bool) (ethdb.Database, error) {
func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) {
// Create the idle freezer instance
frdb, err := newFreezer(freezer, namespace, readonly)
if err != nil {
return nil, err
}
var offset uint64
// The offset of ancientDB should be handled differently in different scenarios.
if isLastOffset {
offset = ReadOffSetOfLastAncientFreezer(db)
} else {
offset = ReadOffSetOfCurrentAncientFreezer(db)
}
frdb.offset = offset
// Some blocks in ancientDB may have already been frozen and been pruned, so adding the offset to
// reprensent the absolute number of blocks already frozen.
frdb.frozen += offset
// Since the freezer can be stored separately from the user's key-value database,
// there's a fairly high probability that the user requests invalid combinations
// of the freezer and database. Ensure that we don't shoot ourselves in the foot
@ -179,7 +244,10 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
// If the genesis hash is empty, we have a new key-value store, so nothing to
// validate in this method. If, however, the genesis hash is not nil, compare
// it to the freezer content.
if kvgenesis, _ := db.Get(headerHashKey(0)); len(kvgenesis) > 0 {
// Only to check the followings when offset equal to 0, otherwise the block number
// in ancientdb did not start with 0, no genesis block in ancientdb as well.
if kvgenesis, _ := db.Get(headerHashKey(0)); offset == 0 && len(kvgenesis) > 0 {
if frozen, _ := frdb.Ancients(); frozen > 0 {
// If the freezer already contains something, ensure that the genesis blocks
// match, otherwise we might mix up freezers across chains and destroy both
@ -221,8 +289,9 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
// feezer.
}
}
// Freezer is consistent with the key-value database, permit combining the two
if !frdb.readonly {
if !disableFreeze && !frdb.readonly {
go frdb.freeze(db)
}
return &freezerdb{
@ -256,12 +325,12 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string, r
// NewLevelDBDatabaseWithFreezer creates a persistent key-value database with a
// freezer moving immutable chain segments into cold storage.
func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer string, namespace string, readonly bool) (ethdb.Database, error) {
func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer string, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) {
kvdb, err := leveldb.New(file, cache, handles, namespace, readonly)
if err != nil {
return nil, err
}
frdb, err := NewDatabaseWithFreezer(kvdb, freezer, namespace, readonly)
frdb, err := NewDatabaseWithFreezer(kvdb, freezer, namespace, readonly, disableFreeze, isLastOffset)
if err != nil {
kvdb.Close()
return nil, err
@ -298,6 +367,35 @@ func (s *stat) Size() string {
func (s *stat) Count() string {
return s.count.String()
}
func AncientInspect(db ethdb.Database) error {
offset := counter(ReadOffSetOfCurrentAncientFreezer(db))
// Get number of ancient rows inside the freezer.
ancients := counter(0)
if count, err := db.ItemAmountInAncient(); err != nil {
log.Error("failed to get the items amount in ancientDB", "err", err)
return err
} else {
ancients = counter(count)
}
var endNumber counter
if offset+ancients <= 0 {
endNumber = 0
} else {
endNumber = offset + ancients - 1
}
stats := [][]string{
{"Offset/StartBlockNumber", "Offset/StartBlockNumber of ancientDB", offset.String()},
{"Amount of remained items in AncientStore", "Remaining items of ancientDB", ancients.String()},
{"The last BlockNumber within ancientDB", "The last BlockNumber", endNumber.String()},
}
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"Database", "Category", "Items"})
table.SetFooter([]string{"", "AncientStore information", ""})
table.AppendBulk(stats)
table.Render()
return nil
}
// InspectDatabase traverses the entire database and checks the size
// of all different categories of data.
@ -431,9 +529,10 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
}
// Get number of ancient rows inside the freezer
ancients := counter(0)
if count, err := db.Ancients(); err == nil {
if count, err := db.ItemAmountInAncient(); err == nil {
ancients = counter(count)
}
// Display the database statistic.
stats := [][]string{
{"Key-Value store", "Headers", headers.Size(), headers.Count()},

@ -85,6 +85,8 @@ type freezer struct {
quit chan struct{}
closeOnce sync.Once
offset uint64 // Starting BlockNumber in current freezer
}
// newFreezer creates a chain freezer that moves ancient chain data into
@ -164,7 +166,7 @@ func (f *freezer) Close() error {
// in the freezer.
func (f *freezer) HasAncient(kind string, number uint64) (bool, error) {
if table := f.tables[kind]; table != nil {
return table.has(number), nil
return table.has(number - f.offset), nil
}
return false, nil
}
@ -172,7 +174,7 @@ func (f *freezer) HasAncient(kind string, number uint64) (bool, error) {
// Ancient retrieves an ancient binary blob from the append-only immutable files.
func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) {
if table := f.tables[kind]; table != nil {
return table.Retrieve(number)
return table.Retrieve(number - f.offset)
}
return nil, errUnknownTable
}
@ -182,6 +184,16 @@ func (f *freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
}
// ItemAmountInAncient returns the actual length of current ancientDB.
func (f *freezer) ItemAmountInAncient() (uint64, error) {
return atomic.LoadUint64(&f.frozen) - atomic.LoadUint64(&f.offset), nil
}
// AncientOffSet returns the offset of current ancientDB.
func (f *freezer) AncientOffSet() uint64 {
return atomic.LoadUint64(&f.offset)
}
// AncientSize returns the ancient size of the specified category.
func (f *freezer) AncientSize(kind string) (uint64, error) {
if table := f.tables[kind]; table != nil {
@ -216,23 +228,23 @@ func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td
}
}()
// Inject all the components into the relevant data tables
if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil {
if err := f.tables[freezerHashTable].Append(f.frozen-f.offset, hash[:]); err != nil {
log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err)
return err
}
if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil {
if err := f.tables[freezerHeaderTable].Append(f.frozen-f.offset, header); err != nil {
log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err)
return err
}
if err := f.tables[freezerBodiesTable].Append(f.frozen, body); err != nil {
if err := f.tables[freezerBodiesTable].Append(f.frozen-f.offset, body); err != nil {
log.Error("Failed to append ancient body", "number", f.frozen, "hash", hash, "err", err)
return err
}
if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil {
if err := f.tables[freezerReceiptTable].Append(f.frozen-f.offset, receipts); err != nil {
log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err)
return err
}
if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil {
if err := f.tables[freezerDifficultyTable].Append(f.frozen-f.offset, td); err != nil {
log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err)
return err
}
@ -249,7 +261,7 @@ func (f *freezer) TruncateAncients(items uint64) error {
return nil
}
for _, table := range f.tables {
if err := table.truncate(items); err != nil {
if err := table.truncate(items - f.offset); err != nil {
return err
}
}

@ -69,6 +69,12 @@ var (
// fastTxLookupLimitKey tracks the transaction lookup limit during fast sync.
fastTxLookupLimitKey = []byte("FastTransactionLookupLimit")
//offSet of new updated ancientDB.
offSetOfCurrentAncientFreezer = []byte("offSetOfCurrentAncientFreezer")
//offSet of the ancientDB before updated version.
offSetOfLastAncientFreezer = []byte("offSetOfLastAncientFreezer")
// badBlockKey tracks the list of bad blocks seen by local
badBlockKey = []byte("InvalidBlock")

@ -68,6 +68,16 @@ func (t *table) Ancients() (uint64, error) {
return t.db.Ancients()
}
// ItemAmountInAncient returns the actual length of current ancientDB.
func (t *table) ItemAmountInAncient() (uint64, error) {
return t.db.ItemAmountInAncient()
}
// AncientOffSet returns the offset of current ancientDB.
func (t *table) AncientOffSet() uint64 {
return t.db.AncientOffSet()
}
// AncientSize is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) AncientSize(kind string) (uint64, error) {

@ -27,7 +27,10 @@ import (
"strings"
"time"
"github.com/prometheus/tsdb/fileutil"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
@ -35,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
@ -85,6 +89,14 @@ type Pruner struct {
triesInMemory uint64
}
type BlockPruner struct {
db ethdb.Database
oldAncientPath string
newAncientPath string
node *node.Node
BlockAmountReserved uint64
}
// NewPruner creates the pruner instance.
func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize, triesInMemory uint64) (*Pruner, error) {
headBlock := rawdb.ReadHeadBlock(db)
@ -101,6 +113,7 @@ func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize, trie
bloomSize = 256
}
stateBloom, err := newStateBloomWithSize(bloomSize)
if err != nil {
return nil, err
}
@ -115,6 +128,16 @@ func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize, trie
}, nil
}
func NewBlockPruner(db ethdb.Database, n *node.Node, oldAncientPath, newAncientPath string, BlockAmountReserved uint64) *BlockPruner {
return &BlockPruner{
db: db,
oldAncientPath: oldAncientPath,
newAncientPath: newAncientPath,
node: n,
BlockAmountReserved: BlockAmountReserved,
}
}
func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, middleStateRoots map[common.Hash]struct{}, start time.Time) error {
// Delete all stale trie nodes in the disk. With the help of state bloom
// the trie nodes(and codes) belong to the active state will be filtered
@ -233,6 +256,194 @@ func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, sta
return nil
}
func (p *BlockPruner) backUpOldDb(name string, cache, handles int, namespace string, readonly, interrupt bool) error {
// Open old db wrapper.
chainDb, err := p.node.OpenDatabaseWithFreezer(name, cache, handles, p.oldAncientPath, namespace, readonly, true, interrupt)
if err != nil {
log.Error("Failed to open ancient database", "err=", err)
return err
}
defer chainDb.Close()
log.Info("chainDB opened successfully")
// Get the number of items in old ancient db.
itemsOfAncient, err := chainDb.ItemAmountInAncient()
log.Info("the number of items in ancientDB is ", "itemsOfAncient", itemsOfAncient)
// If we can't access the freezer or it's empty, abort.
if err != nil || itemsOfAncient == 0 {
log.Error("can't access the freezer or it's empty, abort")
return errors.New("can't access the freezer or it's empty, abort")
}
// If the items in freezer is less than the block amount that we want to reserve, it is not enough, should stop.
if itemsOfAncient < p.BlockAmountReserved {
log.Error("the number of old blocks is not enough to reserve,", "ancient items", itemsOfAncient, "the amount specified", p.BlockAmountReserved)
return errors.New("the number of old blocks is not enough to reserve")
}
var oldOffSet uint64
if interrupt {
// The interrupt scecario within this function is specific for old and new ancientDB exsisted concurrently,
// should use last version of offset for oldAncientDB, because current offset is
// actually of the new ancientDB_Backup, but what we want is the offset of ancientDB being backup.
oldOffSet = rawdb.ReadOffSetOfLastAncientFreezer(chainDb)
} else {
// Using current version of ancientDB for oldOffSet because the db for backup is current version.
oldOffSet = rawdb.ReadOffSetOfCurrentAncientFreezer(chainDb)
}
log.Info("the oldOffSet is ", "oldOffSet", oldOffSet)
// Get the start BlockNumber for pruning.
startBlockNumber := oldOffSet + itemsOfAncient - p.BlockAmountReserved
log.Info("new offset/new startBlockNumber is ", "new offset", startBlockNumber)
// Create new ancientdb backup and record the new and last version of offset in kvDB as well.
// For every round, newoffset actually equals to the startBlockNumber in ancient backup db.
frdbBack, err := rawdb.NewFreezerDb(chainDb, p.newAncientPath, namespace, readonly, startBlockNumber)
if err != nil {
log.Error("Failed to create ancient freezer backup", "err=", err)
return err
}
defer frdbBack.Close()
offsetBatch := chainDb.NewBatch()
rawdb.WriteOffSetOfCurrentAncientFreezer(offsetBatch, startBlockNumber)
rawdb.WriteOffSetOfLastAncientFreezer(offsetBatch, oldOffSet)
if err := offsetBatch.Write(); err != nil {
log.Crit("Failed to write offset into disk", "err", err)
}
// It's guaranteed that the old/new offsets are updated as well as the new ancientDB are created if this flock exist.
lock, _, err := fileutil.Flock(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK"))
if err != nil {
log.Error("file lock error", "err", err)
return err
}
log.Info("prune info", "old offset", oldOffSet, "number of items in ancientDB", itemsOfAncient, "amount to reserve", p.BlockAmountReserved)
log.Info("new offset/new startBlockNumber recorded successfully ", "new offset", startBlockNumber)
start := time.Now()
// All ancient data after and including startBlockNumber should write into new ancientDB ancient_back.
for blockNumber := startBlockNumber; blockNumber < itemsOfAncient+oldOffSet; blockNumber++ {
blockHash := rawdb.ReadCanonicalHash(chainDb, blockNumber)
block := rawdb.ReadBlock(chainDb, blockHash, blockNumber)
receipts := rawdb.ReadRawReceipts(chainDb, blockHash, blockNumber)
// Calculate the total difficulty of the block
td := rawdb.ReadTd(chainDb, blockHash, blockNumber)
if td == nil {
return consensus.ErrUnknownAncestor
}
// Write into new ancient_back db.
rawdb.WriteAncientBlock(frdbBack, block, receipts, td)
// Print the log every 5s for better trace.
if common.PrettyDuration(time.Since(start)) > common.PrettyDuration(5*time.Second) {
log.Info("block backup process running successfully", "current blockNumber for backup", blockNumber)
start = time.Now()
}
}
lock.Release()
log.Info("block back up done", "current start blockNumber in ancientDB", startBlockNumber)
return nil
}
// Backup the ancient data for the old ancient db, i.e. the most recent 128 blocks in ancient db.
func (p *BlockPruner) BlockPruneBackUp(name string, cache, handles int, namespace string, readonly, interrupt bool) error {
start := time.Now()
if err := p.backUpOldDb(name, cache, handles, namespace, readonly, interrupt); err != nil {
return err
}
log.Info("Block pruning BackUp successfully", "time duration since start is", common.PrettyDuration(time.Since(start)))
return nil
}
func (p *BlockPruner) RecoverInterruption(name string, cache, handles int, namespace string, readonly bool) error {
log.Info("RecoverInterruption for block prune")
newExist, err := CheckFileExist(p.newAncientPath)
if err != nil {
log.Error("newAncientDb path error")
return err
}
if newExist {
log.Info("New ancientDB_backup existed in interruption scenario")
flockOfAncientBack, err := CheckFileExist(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK"))
if err != nil {
log.Error("Failed to check flock of ancientDB_Back %v", err)
return err
}
// Indicating both old and new ancientDB existed concurrently.
// Delete directly for the new ancientdb to prune from start, e.g.: path ../chaindb/ancient_backup
if err := os.RemoveAll(p.newAncientPath); err != nil {
log.Error("Failed to remove old ancient directory %v", err)
return err
}
if flockOfAncientBack {
// Indicating the oldOffset/newOffset have already been updated.
if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, true); err != nil {
log.Error("Failed to prune")
return err
}
} else {
// Indicating the flock did not exist and the new offset did not be updated, so just handle this case as usual.
if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, false); err != nil {
log.Error("Failed to prune")
return err
}
}
if err := p.AncientDbReplacer(); err != nil {
log.Error("Failed to replace ancientDB")
return err
}
} else {
log.Info("New ancientDB_backup did not exist in interruption scenario")
// Indicating new ancientDB even did not be created, just prune starting at backup from startBlockNumber as usual,
// in this case, the new offset have not been written into kvDB.
if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, false); err != nil {
log.Error("Failed to prune")
return err
}
if err := p.AncientDbReplacer(); err != nil {
log.Error("Failed to replace ancientDB")
return err
}
}
return nil
}
func CheckFileExist(path string) (bool, error) {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
// Indicating the file didn't exist.
return false, nil
}
return true, err
}
return true, nil
}
func (p *BlockPruner) AncientDbReplacer() error {
// Delete directly for the old ancientdb, e.g.: path ../chaindb/ancient
if err := os.RemoveAll(p.oldAncientPath); err != nil {
log.Error("Failed to remove old ancient directory %v", err)
return err
}
// Rename the new ancientdb path same to the old
if err := os.Rename(p.newAncientPath, p.oldAncientPath); err != nil {
log.Error("Failed to rename new ancient directory")
return err
}
return nil
}
// Prune deletes all historical state nodes except the nodes belong to the
// specified state version. If user doesn't specify the state version, use
// the bottom-most snapshot diff layer as the target.

@ -599,10 +599,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.ancientLimit = 0
}
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
itemAmountInAncient, _ := d.stateDB.ItemAmountInAncient()
// If a part of blockchain data has already been written into active store,
// disable the ancient style insertion explicitly.
if origin >= frozen && frozen != 0 {
if origin >= frozen && itemAmountInAncient != 0 {
d.ancientLimit = 0
log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
} else if d.ancientLimit > 0 {

@ -81,6 +81,12 @@ type AncientReader interface {
// AncientSize returns the ancient size of the specified category.
AncientSize(kind string) (uint64, error)
// ItemAmountInAncient returns the actual length of current ancientDB.
ItemAmountInAncient() (uint64, error)
// AncientOffSet returns the offset of current ancientDB.
AncientOffSet() uint64
}
// AncientWriter contains the methods required to write to immutable ancient data.

@ -27,6 +27,8 @@ import (
"strings"
"sync"
"github.com/prometheus/tsdb/fileutil"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
@ -35,7 +37,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prometheus/tsdb/fileutil"
)
// Node is a container on which services can be registered.
@ -586,7 +587,7 @@ func (n *Node) OpenAndMergeDatabase(name string, cache, handles int, freezer, di
if persistDiff {
chainDataHandles = handles * chainDataHandlesPercentage / 100
}
chainDB, err := n.OpenDatabaseWithFreezer(name, cache, chainDataHandles, freezer, namespace, readonly)
chainDB, err := n.OpenDatabaseWithFreezer(name, cache, chainDataHandles, freezer, namespace, readonly, false, false)
if err != nil {
return nil, err
}
@ -606,7 +607,7 @@ func (n *Node) OpenAndMergeDatabase(name string, cache, handles int, freezer, di
// also attaching a chain freezer to it that moves ancient chain data from the
// database to immutable append-only files. If the node is an ephemeral one, a
// memory database is returned.
func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string, readonly bool) (ethdb.Database, error) {
func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.state == closedState {
@ -625,7 +626,7 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer,
case !filepath.IsAbs(freezer):
freezer = n.ResolvePath(freezer)
}
db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace, readonly)
db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace, readonly, disableFreeze, isLastOffset)
}
if err == nil {

@ -22,5 +22,5 @@ import "reflect"
// byteArrayBytes returns a slice of the byte array v.
func byteArrayBytes(v reflect.Value) []byte {
return v.Slice(0, v.Len()).Bytes()
return v.Slice(0, v.Len()).Bytes()
}

@ -28,6 +28,7 @@ import (
gnark "github.com/consensys/gnark-crypto/ecc/bls12-381"
"github.com/consensys/gnark-crypto/ecc/bls12-381/fp"
"github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
"github.com/ethereum/go-ethereum/crypto/bls12381"
)
@ -159,7 +160,7 @@ func FuzzCrossG1MultiExp(data []byte) int {
gethPoints = append(gethPoints, new(bls12381.PointG1).Set(kp1))
gnarkPoints = append(gnarkPoints, *cp1)
}
if len(gethScalars) == 0{
if len(gethScalars) == 0 {
return 0
}
// compute multi exponentiation