fix: the bug of blobsidecars and downloader with multi-database (#2564)
This commit is contained in:
parent
13d454796f
commit
87e622e51f
@ -64,6 +64,7 @@ var (
|
|||||||
utils.CachePreimagesFlag,
|
utils.CachePreimagesFlag,
|
||||||
utils.OverrideBohr,
|
utils.OverrideBohr,
|
||||||
utils.OverrideVerkle,
|
utils.OverrideVerkle,
|
||||||
|
utils.MultiDataBaseFlag,
|
||||||
}, utils.DatabaseFlags),
|
}, utils.DatabaseFlags),
|
||||||
Description: `
|
Description: `
|
||||||
The init command initializes a new genesis block and definition for the network.
|
The init command initializes a new genesis block and definition for the network.
|
||||||
@ -759,7 +760,7 @@ func parseDumpConfig(ctx *cli.Context, stack *node.Node) (*state.DumpConfig, eth
|
|||||||
arg := ctx.Args().First()
|
arg := ctx.Args().First()
|
||||||
if hashish(arg) {
|
if hashish(arg) {
|
||||||
hash := common.HexToHash(arg)
|
hash := common.HexToHash(arg)
|
||||||
if number := rawdb.ReadHeaderNumber(db.BlockStore(), hash); number != nil {
|
if number := rawdb.ReadHeaderNumber(db, hash); number != nil {
|
||||||
header = rawdb.ReadHeader(db, hash, *number)
|
header = rawdb.ReadHeader(db, hash, *number)
|
||||||
} else {
|
} else {
|
||||||
return nil, nil, common.Hash{}, fmt.Errorf("block %x not found", hash)
|
return nil, nil, common.Hash{}, fmt.Errorf("block %x not found", hash)
|
||||||
|
@ -397,8 +397,8 @@ func inspectTrie(ctx *cli.Context) error {
|
|||||||
var headerBlockHash common.Hash
|
var headerBlockHash common.Hash
|
||||||
if ctx.NArg() >= 1 {
|
if ctx.NArg() >= 1 {
|
||||||
if ctx.Args().Get(0) == "latest" {
|
if ctx.Args().Get(0) == "latest" {
|
||||||
headerHash := rawdb.ReadHeadHeaderHash(db.BlockStore())
|
headerHash := rawdb.ReadHeadHeaderHash(db)
|
||||||
blockNumber = *(rawdb.ReadHeaderNumber(db.BlockStore(), headerHash))
|
blockNumber = *(rawdb.ReadHeaderNumber(db, headerHash))
|
||||||
} else if ctx.Args().Get(0) == "snapshot" {
|
} else if ctx.Args().Get(0) == "snapshot" {
|
||||||
trieRootHash = rawdb.ReadSnapshotRoot(db)
|
trieRootHash = rawdb.ReadSnapshotRoot(db)
|
||||||
blockNumber = math.MaxUint64
|
blockNumber = math.MaxUint64
|
||||||
@ -1212,7 +1212,7 @@ func showMetaData(ctx *cli.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "Error accessing ancients: %v", err)
|
fmt.Fprintf(os.Stderr, "Error accessing ancients: %v", err)
|
||||||
}
|
}
|
||||||
data := rawdb.ReadChainMetadataFromMultiDatabase(db)
|
data := rawdb.ReadChainMetadata(db)
|
||||||
data = append(data, []string{"frozen", fmt.Sprintf("%d items", ancients)})
|
data = append(data, []string{"frozen", fmt.Sprintf("%d items", ancients)})
|
||||||
data = append(data, []string{"snapshotGenerator", snapshot.ParseGeneratorStatus(rawdb.ReadSnapshotGenerator(db))})
|
data = append(data, []string{"snapshotGenerator", snapshot.ParseGeneratorStatus(rawdb.ReadSnapshotGenerator(db))})
|
||||||
if b := rawdb.ReadHeadBlock(db); b != nil {
|
if b := rawdb.ReadHeadBlock(db); b != nil {
|
||||||
@ -1255,7 +1255,7 @@ func hbss2pbss(ctx *cli.Context) error {
|
|||||||
defer stack.Close()
|
defer stack.Close()
|
||||||
|
|
||||||
db := utils.MakeChainDatabase(ctx, stack, false, false)
|
db := utils.MakeChainDatabase(ctx, stack, false, false)
|
||||||
db.Sync()
|
db.BlockStore().Sync()
|
||||||
stateDiskDb := db.StateStore()
|
stateDiskDb := db.StateStore()
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
@ -1273,8 +1273,8 @@ func hbss2pbss(ctx *cli.Context) error {
|
|||||||
log.Info("hbss2pbss triedb", "scheme", triedb.Scheme())
|
log.Info("hbss2pbss triedb", "scheme", triedb.Scheme())
|
||||||
defer triedb.Close()
|
defer triedb.Close()
|
||||||
|
|
||||||
headerHash := rawdb.ReadHeadHeaderHash(db.BlockStore())
|
headerHash := rawdb.ReadHeadHeaderHash(db)
|
||||||
blockNumber := rawdb.ReadHeaderNumber(db.BlockStore(), headerHash)
|
blockNumber := rawdb.ReadHeaderNumber(db, headerHash)
|
||||||
if blockNumber == nil {
|
if blockNumber == nil {
|
||||||
log.Error("read header number failed.")
|
log.Error("read header number failed.")
|
||||||
return fmt.Errorf("read header number failed")
|
return fmt.Errorf("read header number failed")
|
||||||
|
@ -125,6 +125,7 @@ var (
|
|||||||
utils.CacheSnapshotFlag,
|
utils.CacheSnapshotFlag,
|
||||||
// utils.CacheNoPrefetchFlag,
|
// utils.CacheNoPrefetchFlag,
|
||||||
utils.CachePreimagesFlag,
|
utils.CachePreimagesFlag,
|
||||||
|
utils.MultiDataBaseFlag,
|
||||||
utils.PersistDiffFlag,
|
utils.PersistDiffFlag,
|
||||||
utils.DiffBlockFlag,
|
utils.DiffBlockFlag,
|
||||||
utils.PruneAncientDataFlag,
|
utils.PruneAncientDataFlag,
|
||||||
|
@ -1153,7 +1153,6 @@ var (
|
|||||||
DBEngineFlag,
|
DBEngineFlag,
|
||||||
StateSchemeFlag,
|
StateSchemeFlag,
|
||||||
HttpHeaderFlag,
|
HttpHeaderFlag,
|
||||||
MultiDataBaseFlag,
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -462,8 +462,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Ensure that a previous crash in SetHead doesn't leave extra ancients
|
// Ensure that a previous crash in SetHead doesn't leave extra ancients
|
||||||
if frozen, err := bc.db.ItemAmountInAncient(); err == nil && frozen > 0 {
|
if frozen, err := bc.db.BlockStore().ItemAmountInAncient(); err == nil && frozen > 0 {
|
||||||
frozen, err = bc.db.Ancients()
|
frozen, err = bc.db.BlockStore().Ancients()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -663,7 +663,7 @@ func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer, diffLayerCh cha
|
|||||||
// into node seamlessly.
|
// into node seamlessly.
|
||||||
func (bc *BlockChain) empty() bool {
|
func (bc *BlockChain) empty() bool {
|
||||||
genesis := bc.genesisBlock.Hash()
|
genesis := bc.genesisBlock.Hash()
|
||||||
for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db.BlockStore()), rawdb.ReadHeadHeaderHash(bc.db.BlockStore()), rawdb.ReadHeadFastBlockHash(bc.db.BlockStore())} {
|
for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db), rawdb.ReadHeadHeaderHash(bc.db), rawdb.ReadHeadFastBlockHash(bc.db)} {
|
||||||
if hash != genesis {
|
if hash != genesis {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -699,7 +699,7 @@ func (bc *BlockChain) getFinalizedNumber(header *types.Header) uint64 {
|
|||||||
// assumes that the chain manager mutex is held.
|
// assumes that the chain manager mutex is held.
|
||||||
func (bc *BlockChain) loadLastState() error {
|
func (bc *BlockChain) loadLastState() error {
|
||||||
// Restore the last known head block
|
// Restore the last known head block
|
||||||
head := rawdb.ReadHeadBlockHash(bc.db.BlockStore())
|
head := rawdb.ReadHeadBlockHash(bc.db)
|
||||||
if head == (common.Hash{}) {
|
if head == (common.Hash{}) {
|
||||||
// Corrupt or empty database, init from scratch
|
// Corrupt or empty database, init from scratch
|
||||||
log.Warn("Empty database, resetting chain")
|
log.Warn("Empty database, resetting chain")
|
||||||
@ -721,7 +721,7 @@ func (bc *BlockChain) loadLastState() error {
|
|||||||
|
|
||||||
// Restore the last known head header
|
// Restore the last known head header
|
||||||
headHeader := headBlock.Header()
|
headHeader := headBlock.Header()
|
||||||
if head := rawdb.ReadHeadHeaderHash(bc.db.BlockStore()); head != (common.Hash{}) {
|
if head := rawdb.ReadHeadHeaderHash(bc.db); head != (common.Hash{}) {
|
||||||
if header := bc.GetHeaderByHash(head); header != nil {
|
if header := bc.GetHeaderByHash(head); header != nil {
|
||||||
headHeader = header
|
headHeader = header
|
||||||
}
|
}
|
||||||
@ -732,7 +732,7 @@ func (bc *BlockChain) loadLastState() error {
|
|||||||
bc.currentSnapBlock.Store(headBlock.Header())
|
bc.currentSnapBlock.Store(headBlock.Header())
|
||||||
headFastBlockGauge.Update(int64(headBlock.NumberU64()))
|
headFastBlockGauge.Update(int64(headBlock.NumberU64()))
|
||||||
|
|
||||||
if head := rawdb.ReadHeadFastBlockHash(bc.db.BlockStore()); head != (common.Hash{}) {
|
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
|
||||||
if block := bc.GetBlockByHash(head); block != nil {
|
if block := bc.GetBlockByHash(head); block != nil {
|
||||||
bc.currentSnapBlock.Store(block.Header())
|
bc.currentSnapBlock.Store(block.Header())
|
||||||
headFastBlockGauge.Update(int64(block.NumberU64()))
|
headFastBlockGauge.Update(int64(block.NumberU64()))
|
||||||
@ -1100,7 +1100,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
|
|||||||
// intent afterwards is full block importing, delete the chain segment
|
// intent afterwards is full block importing, delete the chain segment
|
||||||
// between the stateful-block and the sethead target.
|
// between the stateful-block and the sethead target.
|
||||||
var wipe bool
|
var wipe bool
|
||||||
frozen, _ := bc.db.Ancients()
|
frozen, _ := bc.db.BlockStore().Ancients()
|
||||||
if headNumber+1 < frozen {
|
if headNumber+1 < frozen {
|
||||||
wipe = pivot == nil || headNumber >= *pivot
|
wipe = pivot == nil || headNumber >= *pivot
|
||||||
}
|
}
|
||||||
@ -1109,11 +1109,11 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
|
|||||||
// Rewind the header chain, deleting all block bodies until then
|
// Rewind the header chain, deleting all block bodies until then
|
||||||
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
|
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
|
||||||
// Ignore the error here since light client won't hit this path
|
// Ignore the error here since light client won't hit this path
|
||||||
frozen, _ := bc.db.Ancients()
|
frozen, _ := bc.db.BlockStore().Ancients()
|
||||||
if num+1 <= frozen {
|
if num+1 <= frozen {
|
||||||
// Truncate all relative data(header, total difficulty, body, receipt
|
// Truncate all relative data(header, total difficulty, body, receipt
|
||||||
// and canonical hash) from ancient store.
|
// and canonical hash) from ancient store.
|
||||||
if _, err := bc.db.TruncateHead(num); err != nil {
|
if _, err := bc.db.BlockStore().TruncateHead(num); err != nil {
|
||||||
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
|
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
|
||||||
}
|
}
|
||||||
// Remove the hash <-> number mapping from the active store.
|
// Remove the hash <-> number mapping from the active store.
|
||||||
@ -1556,9 +1556,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
|
|||||||
|
|
||||||
// Ensure genesis is in ancients.
|
// Ensure genesis is in ancients.
|
||||||
if first.NumberU64() == 1 {
|
if first.NumberU64() == 1 {
|
||||||
if frozen, _ := bc.db.Ancients(); frozen == 0 {
|
if frozen, _ := bc.db.BlockStore().Ancients(); frozen == 0 {
|
||||||
td := bc.genesisBlock.Difficulty()
|
td := bc.genesisBlock.Difficulty()
|
||||||
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td)
|
writeSize, err := rawdb.WriteAncientBlocks(bc.db.BlockStore(), []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error writing genesis to ancients", "err", err)
|
log.Error("Error writing genesis to ancients", "err", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -1576,7 +1576,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
|
|||||||
|
|
||||||
// Write all chain data to ancients.
|
// Write all chain data to ancients.
|
||||||
td := bc.GetTd(first.Hash(), first.NumberU64())
|
td := bc.GetTd(first.Hash(), first.NumberU64())
|
||||||
writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db, blockChain, receiptChain, td)
|
writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db.BlockStore(), blockChain, receiptChain, td)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error importing chain data to ancients", "err", err)
|
log.Error("Error importing chain data to ancients", "err", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -1584,7 +1584,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
|
|||||||
size += writeSize
|
size += writeSize
|
||||||
|
|
||||||
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
|
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
|
||||||
if err := bc.db.Sync(); err != nil {
|
if err := bc.db.BlockStore().Sync(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
// Update the current snap block because all block data is now present in DB.
|
// Update the current snap block because all block data is now present in DB.
|
||||||
@ -1592,7 +1592,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
|
|||||||
if !updateHead(blockChain[len(blockChain)-1]) {
|
if !updateHead(blockChain[len(blockChain)-1]) {
|
||||||
// We end up here if the header chain has reorg'ed, and the blocks/receipts
|
// We end up here if the header chain has reorg'ed, and the blocks/receipts
|
||||||
// don't match the canonical chain.
|
// don't match the canonical chain.
|
||||||
if _, err := bc.db.TruncateHead(previousSnapBlock + 1); err != nil {
|
if _, err := bc.db.BlockStore().TruncateHead(previousSnapBlock + 1); err != nil {
|
||||||
log.Error("Can't truncate ancient store after failed insert", "err", err)
|
log.Error("Can't truncate ancient store after failed insert", "err", err)
|
||||||
}
|
}
|
||||||
return 0, errSideChainReceipts
|
return 0, errSideChainReceipts
|
||||||
@ -1612,7 +1612,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
|
|||||||
rawdb.DeleteBlockWithoutNumber(blockBatch, block.Hash(), block.NumberU64())
|
rawdb.DeleteBlockWithoutNumber(blockBatch, block.Hash(), block.NumberU64())
|
||||||
}
|
}
|
||||||
// Delete side chain hash-to-number mappings.
|
// Delete side chain hash-to-number mappings.
|
||||||
for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) {
|
for _, nh := range rawdb.ReadAllHashesInRange(bc.db.BlockStore(), first.NumberU64(), last.NumberU64()) {
|
||||||
if _, canon := canonHashes[nh.Hash]; !canon {
|
if _, canon := canonHashes[nh.Hash]; !canon {
|
||||||
rawdb.DeleteHeader(blockBatch, nh.Hash, nh.Number)
|
rawdb.DeleteHeader(blockBatch, nh.Hash, nh.Number)
|
||||||
}
|
}
|
||||||
|
@ -231,7 +231,7 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
|
|||||||
if receipts, ok := bc.receiptsCache.Get(hash); ok {
|
if receipts, ok := bc.receiptsCache.Get(hash); ok {
|
||||||
return receipts
|
return receipts
|
||||||
}
|
}
|
||||||
number := rawdb.ReadHeaderNumber(bc.db.BlockStore(), hash)
|
number := rawdb.ReadHeaderNumber(bc.db, hash)
|
||||||
if number == nil {
|
if number == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -514,7 +514,7 @@ func (bc *BlockChain) SubscribeFinalizedHeaderEvent(ch chan<- FinalizedHeaderEve
|
|||||||
|
|
||||||
// AncientTail retrieves the tail the ancients blocks
|
// AncientTail retrieves the tail the ancients blocks
|
||||||
func (bc *BlockChain) AncientTail() (uint64, error) {
|
func (bc *BlockChain) AncientTail() (uint64, error) {
|
||||||
tail, err := bc.db.Tail()
|
tail, err := bc.db.BlockStore().Tail()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -227,8 +227,8 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainH
|
|||||||
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
|
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
|
||||||
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
|
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
|
||||||
|
|
||||||
if rawdb.ReadCanonicalHash(c.chainDb.BlockStore(), prevHeader.Number.Uint64()) != prevHash {
|
if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash {
|
||||||
if h := rawdb.FindCommonAncestor(c.chainDb.BlockStore(), prevHeader, header); h != nil {
|
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
|
||||||
c.newHead(h.Number.Uint64(), true)
|
c.newHead(h.Number.Uint64(), true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -97,7 +97,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
|
|||||||
return nil, ErrNoGenesis
|
return nil, ErrNoGenesis
|
||||||
}
|
}
|
||||||
hc.currentHeader.Store(hc.genesisHeader)
|
hc.currentHeader.Store(hc.genesisHeader)
|
||||||
if head := rawdb.ReadHeadBlockHash(chainDb.BlockStore()); head != (common.Hash{}) {
|
if head := rawdb.ReadHeadBlockHash(chainDb); head != (common.Hash{}) {
|
||||||
if chead := hc.GetHeaderByHash(head); chead != nil {
|
if chead := hc.GetHeaderByHash(head); chead != nil {
|
||||||
hc.currentHeader.Store(chead)
|
hc.currentHeader.Store(chead)
|
||||||
}
|
}
|
||||||
@ -144,7 +144,7 @@ func (hc *HeaderChain) GetBlockNumber(hash common.Hash) *uint64 {
|
|||||||
if cached, ok := hc.numberCache.Get(hash); ok {
|
if cached, ok := hc.numberCache.Get(hash); ok {
|
||||||
return &cached
|
return &cached
|
||||||
}
|
}
|
||||||
number := rawdb.ReadHeaderNumber(hc.chainDb.BlockStore(), hash)
|
number := rawdb.ReadHeaderNumber(hc.chainDb, hash)
|
||||||
if number != nil {
|
if number != nil {
|
||||||
hc.numberCache.Add(hash, *number)
|
hc.numberCache.Add(hash, *number)
|
||||||
}
|
}
|
||||||
@ -691,7 +691,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat
|
|||||||
// we don't end up with dangling daps in the database
|
// we don't end up with dangling daps in the database
|
||||||
var nums []uint64
|
var nums []uint64
|
||||||
if origin {
|
if origin {
|
||||||
for n := num + 1; len(rawdb.ReadAllHashes(hc.chainDb, n)) > 0; n++ {
|
for n := num + 1; len(rawdb.ReadAllHashes(hc.chainDb.BlockStore(), n)) > 0; n++ {
|
||||||
nums = append([]uint64{n}, nums...) // suboptimal, but we don't really expect this path
|
nums = append([]uint64{n}, nums...) // suboptimal, but we don't really expect this path
|
||||||
}
|
}
|
||||||
origin = false
|
origin = false
|
||||||
@ -701,7 +701,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat
|
|||||||
// Remove the related data from the database on all sidechains
|
// Remove the related data from the database on all sidechains
|
||||||
for _, num := range nums {
|
for _, num := range nums {
|
||||||
// Gather all the side fork hashes
|
// Gather all the side fork hashes
|
||||||
hashes := rawdb.ReadAllHashes(hc.chainDb, num)
|
hashes := rawdb.ReadAllHashes(hc.chainDb.BlockStore(), num)
|
||||||
if len(hashes) == 0 {
|
if len(hashes) == 0 {
|
||||||
// No hashes in the database whatsoever, probably frozen already
|
// No hashes in the database whatsoever, probably frozen already
|
||||||
hashes = append(hashes, hdr.Hash())
|
hashes = append(hashes, hdr.Hash())
|
||||||
|
@ -34,6 +34,15 @@ import (
|
|||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Support Multi-Database Based on Data Pattern, the Chaindata will be divided into three stores: BlockStore, StateStore, and ChainStore,
|
||||||
|
// according to data schema and read/write behavior. When using the following data interfaces, you should take note of the following:
|
||||||
|
//
|
||||||
|
// 1) Block-Related Data: For CanonicalHash, Header, Body, Td, Receipts, and BlobSidecars, the Write, Delete, and Iterator
|
||||||
|
// operations should carefully ensure that the database being used is BlockStore.
|
||||||
|
// 2) Meta-Related Data: For HeaderNumber, HeadHeaderHash, HeadBlockHash, HeadFastBlockHash, and FinalizedBlockHash, the
|
||||||
|
// Write and Delete operations should carefully ensure that the database being used is BlockStore.
|
||||||
|
// 3) Ancient Data: When using a multi-database, Ancient data will use the BlockStore.
|
||||||
|
|
||||||
// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
|
// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
|
||||||
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
|
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
|
||||||
var data []byte
|
var data []byte
|
||||||
@ -144,8 +153,8 @@ func ReadAllCanonicalHashes(db ethdb.Iteratee, from uint64, to uint64, limit int
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ReadHeaderNumber returns the header number assigned to a hash.
|
// ReadHeaderNumber returns the header number assigned to a hash.
|
||||||
func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 {
|
func ReadHeaderNumber(db ethdb.MultiDatabaseReader, hash common.Hash) *uint64 {
|
||||||
data, _ := db.Get(headerNumberKey(hash))
|
data, _ := db.BlockStoreReader().Get(headerNumberKey(hash))
|
||||||
if len(data) != 8 {
|
if len(data) != 8 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -170,8 +179,8 @@ func DeleteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ReadHeadHeaderHash retrieves the hash of the current canonical head header.
|
// ReadHeadHeaderHash retrieves the hash of the current canonical head header.
|
||||||
func ReadHeadHeaderHash(db ethdb.KeyValueReader) common.Hash {
|
func ReadHeadHeaderHash(db ethdb.MultiDatabaseReader) common.Hash {
|
||||||
data, _ := db.Get(headHeaderKey)
|
data, _ := db.BlockStoreReader().Get(headHeaderKey)
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
return common.Hash{}
|
return common.Hash{}
|
||||||
}
|
}
|
||||||
@ -186,8 +195,8 @@ func WriteHeadHeaderHash(db ethdb.KeyValueWriter, hash common.Hash) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ReadHeadBlockHash retrieves the hash of the current canonical head block.
|
// ReadHeadBlockHash retrieves the hash of the current canonical head block.
|
||||||
func ReadHeadBlockHash(db ethdb.KeyValueReader) common.Hash {
|
func ReadHeadBlockHash(db ethdb.MultiDatabaseReader) common.Hash {
|
||||||
data, _ := db.Get(headBlockKey)
|
data, _ := db.BlockStoreReader().Get(headBlockKey)
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
return common.Hash{}
|
return common.Hash{}
|
||||||
}
|
}
|
||||||
@ -202,8 +211,8 @@ func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ReadHeadFastBlockHash retrieves the hash of the current fast-sync head block.
|
// ReadHeadFastBlockHash retrieves the hash of the current fast-sync head block.
|
||||||
func ReadHeadFastBlockHash(db ethdb.KeyValueReader) common.Hash {
|
func ReadHeadFastBlockHash(db ethdb.MultiDatabaseReader) common.Hash {
|
||||||
data, _ := db.Get(headFastBlockKey)
|
data, _ := db.BlockStoreReader().Get(headFastBlockKey)
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
return common.Hash{}
|
return common.Hash{}
|
||||||
}
|
}
|
||||||
@ -218,8 +227,8 @@ func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ReadFinalizedBlockHash retrieves the hash of the finalized block.
|
// ReadFinalizedBlockHash retrieves the hash of the finalized block.
|
||||||
func ReadFinalizedBlockHash(db ethdb.KeyValueReader) common.Hash {
|
func ReadFinalizedBlockHash(db ethdb.MultiDatabaseReader) common.Hash {
|
||||||
data, _ := db.Get(headFinalizedBlockKey)
|
data, _ := db.BlockStoreReader().Get(headFinalizedBlockKey)
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
return common.Hash{}
|
return common.Hash{}
|
||||||
}
|
}
|
||||||
@ -297,7 +306,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu
|
|||||||
// It's ok to request block 0, 1 item
|
// It's ok to request block 0, 1 item
|
||||||
count = number + 1
|
count = number + 1
|
||||||
}
|
}
|
||||||
limit, _ := db.Ancients()
|
limit, _ := db.BlockStoreReader().Ancients()
|
||||||
// First read live blocks
|
// First read live blocks
|
||||||
if i >= limit {
|
if i >= limit {
|
||||||
// If we need to read live blocks, we need to figure out the hash first
|
// If we need to read live blocks, we need to figure out the hash first
|
||||||
@ -317,7 +326,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu
|
|||||||
return rlpHeaders
|
return rlpHeaders
|
||||||
}
|
}
|
||||||
// read remaining from ancients, cap at 2M
|
// read remaining from ancients, cap at 2M
|
||||||
data, err := db.AncientRange(ChainFreezerHeaderTable, i+1-count, count, 2*1024*1024)
|
data, err := db.BlockStoreReader().AncientRange(ChainFreezerHeaderTable, i+1-count, count, 2*1024*1024)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Failed to read headers from freezer", "err", err)
|
log.Error("Failed to read headers from freezer", "err", err)
|
||||||
return rlpHeaders
|
return rlpHeaders
|
||||||
@ -468,7 +477,7 @@ func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue {
|
|||||||
// Block is not in ancients, read from leveldb by hash and number.
|
// Block is not in ancients, read from leveldb by hash and number.
|
||||||
// Note: ReadCanonicalHash cannot be used here because it also
|
// Note: ReadCanonicalHash cannot be used here because it also
|
||||||
// calls ReadAncients internally.
|
// calls ReadAncients internally.
|
||||||
hash, _ := db.Get(headerHashKey(number))
|
hash, _ := db.BlockStoreReader().Get(headerHashKey(number))
|
||||||
data, _ = db.BlockStoreReader().Get(blockBodyKey(number, common.BytesToHash(hash)))
|
data, _ = db.BlockStoreReader().Get(blockBodyKey(number, common.BytesToHash(hash)))
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@ -516,6 +525,13 @@ func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *t
|
|||||||
WriteBodyRLP(db, hash, number, data)
|
WriteBodyRLP(db, hash, number, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteBody removes all block body data associated with a hash.
|
||||||
|
func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
|
||||||
|
if err := db.Delete(blockBodyKey(number, hash)); err != nil {
|
||||||
|
log.Crit("Failed to delete block body", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WriteDiffLayer(db ethdb.KeyValueWriter, hash common.Hash, layer *types.DiffLayer) {
|
func WriteDiffLayer(db ethdb.KeyValueWriter, hash common.Hash, layer *types.DiffLayer) {
|
||||||
data, err := rlp.EncodeToBytes(layer)
|
data, err := rlp.EncodeToBytes(layer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -554,13 +570,6 @@ func DeleteDiffLayer(db ethdb.KeyValueWriter, blockHash common.Hash) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBody removes all block body data associated with a hash.
|
|
||||||
func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
|
|
||||||
if err := db.Delete(blockBodyKey(number, hash)); err != nil {
|
|
||||||
log.Crit("Failed to delete block body", "err", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding.
|
// ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding.
|
||||||
func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
|
func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
|
||||||
var data []byte
|
var data []byte
|
||||||
@ -884,7 +893,7 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts
|
|||||||
// ReadBlobSidecarsRLP retrieves all the transaction blobs belonging to a block in RLP encoding.
|
// ReadBlobSidecarsRLP retrieves all the transaction blobs belonging to a block in RLP encoding.
|
||||||
func ReadBlobSidecarsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
|
func ReadBlobSidecarsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
|
||||||
var data []byte
|
var data []byte
|
||||||
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
|
db.BlockStoreReader().ReadAncients(func(reader ethdb.AncientReaderOp) error {
|
||||||
// Check if the data is in ancients
|
// Check if the data is in ancients
|
||||||
if isCanon(reader, number, hash) {
|
if isCanon(reader, number, hash) {
|
||||||
data, _ = reader.Ancient(ChainFreezerBlobSidecarTable, number)
|
data, _ = reader.Ancient(ChainFreezerBlobSidecarTable, number)
|
||||||
@ -1093,24 +1102,24 @@ func FindCommonAncestor(db ethdb.Reader, a, b *types.Header) *types.Header {
|
|||||||
|
|
||||||
// ReadHeadHeader returns the current canonical head header.
|
// ReadHeadHeader returns the current canonical head header.
|
||||||
func ReadHeadHeader(db ethdb.Reader) *types.Header {
|
func ReadHeadHeader(db ethdb.Reader) *types.Header {
|
||||||
headHeaderHash := ReadHeadHeaderHash(db.BlockStoreReader())
|
headHeaderHash := ReadHeadHeaderHash(db)
|
||||||
if headHeaderHash == (common.Hash{}) {
|
if headHeaderHash == (common.Hash{}) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
headHeaderNumber := ReadHeaderNumber(db.BlockStoreReader(), headHeaderHash)
|
headHeaderNumber := ReadHeaderNumber(db, headHeaderHash)
|
||||||
if headHeaderNumber == nil {
|
if headHeaderNumber == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return ReadHeader(db.BlockStoreReader(), headHeaderHash, *headHeaderNumber)
|
return ReadHeader(db, headHeaderHash, *headHeaderNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadHeadBlock returns the current canonical head block.
|
// ReadHeadBlock returns the current canonical head block.
|
||||||
func ReadHeadBlock(db ethdb.Reader) *types.Block {
|
func ReadHeadBlock(db ethdb.Reader) *types.Block {
|
||||||
headBlockHash := ReadHeadBlockHash(db.BlockStoreReader())
|
headBlockHash := ReadHeadBlockHash(db)
|
||||||
if headBlockHash == (common.Hash{}) {
|
if headBlockHash == (common.Hash{}) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
headBlockNumber := ReadHeaderNumber(db.BlockStoreReader(), headBlockHash)
|
headBlockNumber := ReadHeaderNumber(db, headBlockHash)
|
||||||
if headBlockNumber == nil {
|
if headBlockNumber == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
|
|||||||
}
|
}
|
||||||
// Database v4-v5 tx lookup format just stores the hash
|
// Database v4-v5 tx lookup format just stores the hash
|
||||||
if len(data) == common.HashLength {
|
if len(data) == common.HashLength {
|
||||||
return ReadHeaderNumber(db.BlockStoreReader(), common.BytesToHash(data))
|
return ReadHeaderNumber(db, common.BytesToHash(data))
|
||||||
}
|
}
|
||||||
// Finally try database v3 tx lookup format
|
// Finally try database v3 tx lookup format
|
||||||
var entry LegacyTxLookupEntry
|
var entry LegacyTxLookupEntry
|
||||||
|
@ -92,7 +92,7 @@ func (f *chainFreezer) Close() error {
|
|||||||
|
|
||||||
// readHeadNumber returns the number of chain head block. 0 is returned if the
|
// readHeadNumber returns the number of chain head block. 0 is returned if the
|
||||||
// block is unknown or not available yet.
|
// block is unknown or not available yet.
|
||||||
func (f *chainFreezer) readHeadNumber(db ethdb.KeyValueReader) uint64 {
|
func (f *chainFreezer) readHeadNumber(db ethdb.Reader) uint64 {
|
||||||
hash := ReadHeadBlockHash(db)
|
hash := ReadHeadBlockHash(db)
|
||||||
if hash == (common.Hash{}) {
|
if hash == (common.Hash{}) {
|
||||||
log.Error("Head block is not reachable")
|
log.Error("Head block is not reachable")
|
||||||
@ -108,7 +108,7 @@ func (f *chainFreezer) readHeadNumber(db ethdb.KeyValueReader) uint64 {
|
|||||||
|
|
||||||
// readFinalizedNumber returns the number of finalized block. 0 is returned
|
// readFinalizedNumber returns the number of finalized block. 0 is returned
|
||||||
// if the block is unknown or not available yet.
|
// if the block is unknown or not available yet.
|
||||||
func (f *chainFreezer) readFinalizedNumber(db ethdb.KeyValueReader) uint64 {
|
func (f *chainFreezer) readFinalizedNumber(db ethdb.Reader) uint64 {
|
||||||
hash := ReadFinalizedBlockHash(db)
|
hash := ReadFinalizedBlockHash(db)
|
||||||
if hash == (common.Hash{}) {
|
if hash == (common.Hash{}) {
|
||||||
return 0
|
return 0
|
||||||
@ -123,7 +123,7 @@ func (f *chainFreezer) readFinalizedNumber(db ethdb.KeyValueReader) uint64 {
|
|||||||
|
|
||||||
// freezeThreshold returns the threshold for chain freezing. It's determined
|
// freezeThreshold returns the threshold for chain freezing. It's determined
|
||||||
// by formula: max(finality, HEAD-params.FullImmutabilityThreshold).
|
// by formula: max(finality, HEAD-params.FullImmutabilityThreshold).
|
||||||
func (f *chainFreezer) freezeThreshold(db ethdb.KeyValueReader) (uint64, error) {
|
func (f *chainFreezer) freezeThreshold(db ethdb.Reader) (uint64, error) {
|
||||||
var (
|
var (
|
||||||
head = f.readHeadNumber(db)
|
head = f.readHeadNumber(db)
|
||||||
final = f.readFinalizedNumber(db)
|
final = f.readFinalizedNumber(db)
|
||||||
|
@ -35,16 +35,16 @@ import (
|
|||||||
// injects into the database the block hash->number mappings.
|
// injects into the database the block hash->number mappings.
|
||||||
func InitDatabaseFromFreezer(db ethdb.Database) {
|
func InitDatabaseFromFreezer(db ethdb.Database) {
|
||||||
// If we can't access the freezer or it's empty, abort
|
// If we can't access the freezer or it's empty, abort
|
||||||
frozen, err := db.ItemAmountInAncient()
|
frozen, err := db.BlockStore().ItemAmountInAncient()
|
||||||
if err != nil || frozen == 0 {
|
if err != nil || frozen == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
batch = db.NewBatch()
|
batch = db.BlockStore().NewBatch()
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
|
logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
|
||||||
hash common.Hash
|
hash common.Hash
|
||||||
offset = db.AncientOffSet()
|
offset = db.BlockStore().AncientOffSet()
|
||||||
)
|
)
|
||||||
for i := uint64(0) + offset; i < frozen+offset; i++ {
|
for i := uint64(0) + offset; i < frozen+offset; i++ {
|
||||||
// We read 100K hashes at a time, for a total of 3.2M
|
// We read 100K hashes at a time, for a total of 3.2M
|
||||||
@ -52,7 +52,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) {
|
|||||||
if i+count > frozen+offset {
|
if i+count > frozen+offset {
|
||||||
count = frozen + offset - i
|
count = frozen + offset - i
|
||||||
}
|
}
|
||||||
data, err := db.AncientRange(ChainFreezerHashTable, i, count, 32*count)
|
data, err := db.BlockStore().AncientRange(ChainFreezerHashTable, i, count, 32*count)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Crit("Failed to init database from freezer", "err", err)
|
log.Crit("Failed to init database from freezer", "err", err)
|
||||||
}
|
}
|
||||||
@ -100,7 +100,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
|
|||||||
number uint64
|
number uint64
|
||||||
rlp rlp.RawValue
|
rlp rlp.RawValue
|
||||||
}
|
}
|
||||||
if offset := db.AncientOffSet(); offset > from {
|
if offset := db.BlockStore().AncientOffSet(); offset > from {
|
||||||
from = offset
|
from = offset
|
||||||
}
|
}
|
||||||
if to <= from {
|
if to <= from {
|
||||||
@ -122,7 +122,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
|
|||||||
}
|
}
|
||||||
defer close(rlpCh)
|
defer close(rlpCh)
|
||||||
for n != end {
|
for n != end {
|
||||||
data := ReadCanonicalBodyRLP(db.BlockStore(), n)
|
data := ReadCanonicalBodyRLP(db, n)
|
||||||
// Feed the block to the aggregator, or abort on interrupt
|
// Feed the block to the aggregator, or abort on interrupt
|
||||||
select {
|
select {
|
||||||
case rlpCh <- &numberRlp{n, data}:
|
case rlpCh <- &numberRlp{n, data}:
|
||||||
@ -187,7 +187,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
|
|||||||
// signal received.
|
// signal received.
|
||||||
func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {
|
func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {
|
||||||
// short circuit for invalid range
|
// short circuit for invalid range
|
||||||
if offset := db.AncientOffSet(); offset > from {
|
if offset := db.BlockStore().AncientOffSet(); offset > from {
|
||||||
from = offset
|
from = offset
|
||||||
}
|
}
|
||||||
if from >= to {
|
if from >= to {
|
||||||
@ -286,7 +286,7 @@ func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, inte
|
|||||||
// signal received.
|
// signal received.
|
||||||
func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {
|
func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {
|
||||||
// short circuit for invalid range
|
// short circuit for invalid range
|
||||||
if offset := db.AncientOffSet(); offset > from {
|
if offset := db.BlockStore().AncientOffSet(); offset > from {
|
||||||
from = offset
|
from = offset
|
||||||
}
|
}
|
||||||
if from >= to {
|
if from >= to {
|
||||||
|
@ -61,8 +61,10 @@ func (frdb *freezerdb) BlockStoreReader() ethdb.Reader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (frdb *freezerdb) BlockStoreWriter() ethdb.Writer {
|
func (frdb *freezerdb) BlockStoreWriter() ethdb.Writer {
|
||||||
// TODO implement me
|
if frdb.blockStore == nil {
|
||||||
panic("implement me")
|
return frdb
|
||||||
|
}
|
||||||
|
return frdb.blockStore
|
||||||
}
|
}
|
||||||
|
|
||||||
// AncientDatadir returns the path of root ancient directory.
|
// AncientDatadir returns the path of root ancient directory.
|
||||||
@ -116,6 +118,13 @@ func (frdb *freezerdb) StateStore() ethdb.Database {
|
|||||||
return frdb.stateStore
|
return frdb.stateStore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (frdb *freezerdb) GetStateStore() ethdb.Database {
|
||||||
|
if frdb.stateStore != nil {
|
||||||
|
return frdb.stateStore
|
||||||
|
}
|
||||||
|
return frdb
|
||||||
|
}
|
||||||
|
|
||||||
func (frdb *freezerdb) SetStateStore(state ethdb.Database) {
|
func (frdb *freezerdb) SetStateStore(state ethdb.Database) {
|
||||||
if frdb.stateStore != nil {
|
if frdb.stateStore != nil {
|
||||||
frdb.stateStore.Close()
|
frdb.stateStore.Close()
|
||||||
@ -254,6 +263,13 @@ func (db *nofreezedb) SetStateStore(state ethdb.Database) {
|
|||||||
db.stateStore = state
|
db.stateStore = state
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *nofreezedb) GetStateStore() ethdb.Database {
|
||||||
|
if db.stateStore != nil {
|
||||||
|
return db.stateStore
|
||||||
|
}
|
||||||
|
return db
|
||||||
|
}
|
||||||
|
|
||||||
func (db *nofreezedb) StateStoreReader() ethdb.Reader {
|
func (db *nofreezedb) StateStoreReader() ethdb.Reader {
|
||||||
if db.stateStore != nil {
|
if db.stateStore != nil {
|
||||||
return db.stateStore
|
return db.stateStore
|
||||||
@ -403,6 +419,7 @@ func (db *emptyfreezedb) Sync() error {
|
|||||||
func (db *emptyfreezedb) DiffStore() ethdb.KeyValueStore { return db }
|
func (db *emptyfreezedb) DiffStore() ethdb.KeyValueStore { return db }
|
||||||
func (db *emptyfreezedb) SetDiffStore(diff ethdb.KeyValueStore) {}
|
func (db *emptyfreezedb) SetDiffStore(diff ethdb.KeyValueStore) {}
|
||||||
func (db *emptyfreezedb) StateStore() ethdb.Database { return db }
|
func (db *emptyfreezedb) StateStore() ethdb.Database { return db }
|
||||||
|
func (db *emptyfreezedb) GetStateStore() ethdb.Database { return db }
|
||||||
func (db *emptyfreezedb) SetStateStore(state ethdb.Database) {}
|
func (db *emptyfreezedb) SetStateStore(state ethdb.Database) {}
|
||||||
func (db *emptyfreezedb) StateStoreReader() ethdb.Reader { return db }
|
func (db *emptyfreezedb) StateStoreReader() ethdb.Reader { return db }
|
||||||
func (db *emptyfreezedb) BlockStore() ethdb.Database { return db }
|
func (db *emptyfreezedb) BlockStore() ethdb.Database { return db }
|
||||||
@ -518,8 +535,17 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
|
|||||||
|
|
||||||
// Create the idle freezer instance
|
// Create the idle freezer instance
|
||||||
frdb, err := newChainFreezer(resolveChainFreezerDir(ancient), namespace, readonly, offset, multiDatabase)
|
frdb, err := newChainFreezer(resolveChainFreezerDir(ancient), namespace, readonly, offset, multiDatabase)
|
||||||
|
|
||||||
|
// We are creating the freezerdb here because the validation logic for db and freezer below requires certain interfaces
|
||||||
|
// that need a database type. Therefore, we are pre-creating it for subsequent use.
|
||||||
|
freezerDb := &freezerdb{
|
||||||
|
ancientRoot: ancient,
|
||||||
|
KeyValueStore: db,
|
||||||
|
AncientStore: frdb,
|
||||||
|
AncientFreezer: frdb,
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
printChainMetadata(db)
|
printChainMetadata(freezerDb)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -555,10 +581,10 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
|
|||||||
// the freezer and the key-value store.
|
// the freezer and the key-value store.
|
||||||
frgenesis, err := frdb.Ancient(ChainFreezerHashTable, 0)
|
frgenesis, err := frdb.Ancient(ChainFreezerHashTable, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
printChainMetadata(db)
|
printChainMetadata(freezerDb)
|
||||||
return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err)
|
return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err)
|
||||||
} else if !bytes.Equal(kvgenesis, frgenesis) {
|
} else if !bytes.Equal(kvgenesis, frgenesis) {
|
||||||
printChainMetadata(db)
|
printChainMetadata(freezerDb)
|
||||||
return nil, fmt.Errorf("genesis mismatch: %#x (leveldb) != %#x (ancients)", kvgenesis, frgenesis)
|
return nil, fmt.Errorf("genesis mismatch: %#x (leveldb) != %#x (ancients)", kvgenesis, frgenesis)
|
||||||
}
|
}
|
||||||
// Key-value store and freezer belong to the same network. Ensure that they
|
// Key-value store and freezer belong to the same network. Ensure that they
|
||||||
@ -566,7 +592,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
|
|||||||
if kvhash, _ := db.Get(headerHashKey(frozen)); len(kvhash) == 0 {
|
if kvhash, _ := db.Get(headerHashKey(frozen)); len(kvhash) == 0 {
|
||||||
// Subsequent header after the freezer limit is missing from the database.
|
// Subsequent header after the freezer limit is missing from the database.
|
||||||
// Reject startup if the database has a more recent head.
|
// Reject startup if the database has a more recent head.
|
||||||
if head := *ReadHeaderNumber(db, ReadHeadHeaderHash(db)); head > frozen-1 {
|
if head := *ReadHeaderNumber(freezerDb, ReadHeadHeaderHash(freezerDb)); head > frozen-1 {
|
||||||
// Find the smallest block stored in the key-value store
|
// Find the smallest block stored in the key-value store
|
||||||
// in range of [frozen, head]
|
// in range of [frozen, head]
|
||||||
var number uint64
|
var number uint64
|
||||||
@ -576,7 +602,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// We are about to exit on error. Print database metadata before exiting
|
// We are about to exit on error. Print database metadata before exiting
|
||||||
printChainMetadata(db)
|
printChainMetadata(freezerDb)
|
||||||
return nil, fmt.Errorf("gap in the chain between ancients [0 - #%d] and leveldb [#%d - #%d] ",
|
return nil, fmt.Errorf("gap in the chain between ancients [0 - #%d] and leveldb [#%d - #%d] ",
|
||||||
frozen-1, number, head)
|
frozen-1, number, head)
|
||||||
}
|
}
|
||||||
@ -591,11 +617,11 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
|
|||||||
// store, otherwise we'll end up missing data. We check block #1 to decide
|
// store, otherwise we'll end up missing data. We check block #1 to decide
|
||||||
// if we froze anything previously or not, but do take care of databases with
|
// if we froze anything previously or not, but do take care of databases with
|
||||||
// only the genesis block.
|
// only the genesis block.
|
||||||
if ReadHeadHeaderHash(db) != common.BytesToHash(kvgenesis) {
|
if ReadHeadHeaderHash(freezerDb) != common.BytesToHash(kvgenesis) {
|
||||||
// Key-value store contains more data than the genesis block, make sure we
|
// Key-value store contains more data than the genesis block, make sure we
|
||||||
// didn't freeze anything yet.
|
// didn't freeze anything yet.
|
||||||
if kvblob, _ := db.Get(headerHashKey(1)); len(kvblob) == 0 {
|
if kvblob, _ := db.Get(headerHashKey(1)); len(kvblob) == 0 {
|
||||||
printChainMetadata(db)
|
printChainMetadata(freezerDb)
|
||||||
return nil, errors.New("ancient chain segments already extracted, please set --datadir.ancient to the correct path")
|
return nil, errors.New("ancient chain segments already extracted, please set --datadir.ancient to the correct path")
|
||||||
}
|
}
|
||||||
// Block #1 is still in the database, we're allowed to init a new freezer
|
// Block #1 is still in the database, we're allowed to init a new freezer
|
||||||
@ -617,12 +643,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
|
|||||||
frdb.wg.Done()
|
frdb.wg.Done()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
return &freezerdb{
|
return freezerDb, nil
|
||||||
ancientRoot: ancient,
|
|
||||||
KeyValueStore: db,
|
|
||||||
AncientStore: frdb,
|
|
||||||
AncientFreezer: frdb,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMemoryDatabase creates an ephemeral in-memory key-value database without a
|
// NewMemoryDatabase creates an ephemeral in-memory key-value database without a
|
||||||
@ -1238,7 +1259,7 @@ func DeleteTrieState(db ethdb.Database) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// printChainMetadata prints out chain metadata to stderr.
|
// printChainMetadata prints out chain metadata to stderr.
|
||||||
func printChainMetadata(db ethdb.KeyValueStore) {
|
func printChainMetadata(db ethdb.Reader) {
|
||||||
fmt.Fprintf(os.Stderr, "Chain metadata\n")
|
fmt.Fprintf(os.Stderr, "Chain metadata\n")
|
||||||
for _, v := range ReadChainMetadata(db) {
|
for _, v := range ReadChainMetadata(db) {
|
||||||
fmt.Fprintf(os.Stderr, " %s\n", strings.Join(v, ": "))
|
fmt.Fprintf(os.Stderr, " %s\n", strings.Join(v, ": "))
|
||||||
@ -1249,7 +1270,7 @@ func printChainMetadata(db ethdb.KeyValueStore) {
|
|||||||
// ReadChainMetadata returns a set of key/value pairs that contains information
|
// ReadChainMetadata returns a set of key/value pairs that contains information
|
||||||
// about the database chain status. This can be used for diagnostic purposes
|
// about the database chain status. This can be used for diagnostic purposes
|
||||||
// when investigating the state of the node.
|
// when investigating the state of the node.
|
||||||
func ReadChainMetadata(db ethdb.KeyValueStore) [][]string {
|
func ReadChainMetadata(db ethdb.Reader) [][]string {
|
||||||
pp := func(val *uint64) string {
|
pp := func(val *uint64) string {
|
||||||
if val == nil {
|
if val == nil {
|
||||||
return "<nil>"
|
return "<nil>"
|
||||||
@ -1271,26 +1292,3 @@ func ReadChainMetadata(db ethdb.KeyValueStore) [][]string {
|
|||||||
}
|
}
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadChainMetadataFromMultiDatabase(db ethdb.Database) [][]string {
|
|
||||||
pp := func(val *uint64) string {
|
|
||||||
if val == nil {
|
|
||||||
return "<nil>"
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%d (%#x)", *val, *val)
|
|
||||||
}
|
|
||||||
data := [][]string{
|
|
||||||
{"databaseVersion", pp(ReadDatabaseVersion(db))},
|
|
||||||
{"headBlockHash", fmt.Sprintf("%v", ReadHeadBlockHash(db.BlockStore()))},
|
|
||||||
{"headFastBlockHash", fmt.Sprintf("%v", ReadHeadFastBlockHash(db.BlockStore()))},
|
|
||||||
{"headHeaderHash", fmt.Sprintf("%v", ReadHeadHeaderHash(db.BlockStore()))},
|
|
||||||
{"lastPivotNumber", pp(ReadLastPivotNumber(db))},
|
|
||||||
{"len(snapshotSyncStatus)", fmt.Sprintf("%d bytes", len(ReadSnapshotSyncStatus(db)))},
|
|
||||||
{"snapshotDisabled", fmt.Sprintf("%v", ReadSnapshotDisabled(db))},
|
|
||||||
{"snapshotJournal", fmt.Sprintf("%d bytes", len(ReadSnapshotJournal(db)))},
|
|
||||||
{"snapshotRecoveryNumber", pp(ReadSnapshotRecoveryNumber(db))},
|
|
||||||
{"snapshotRoot", fmt.Sprintf("%v", ReadSnapshotRoot(db))},
|
|
||||||
{"txIndexTail", pp(ReadTxIndexTail(db))},
|
|
||||||
}
|
|
||||||
return data
|
|
||||||
}
|
|
||||||
|
@ -251,6 +251,10 @@ func (t *table) SetStateStore(state ethdb.Database) {
|
|||||||
panic("not implement")
|
panic("not implement")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *table) GetStateStore() ethdb.Database {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (t *table) StateStoreReader() ethdb.Reader {
|
func (t *table) StateStoreReader() ethdb.Reader {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// NewStateSync creates a new state trie download scheduler.
|
// NewStateSync creates a new state trie download scheduler.
|
||||||
func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(keys [][]byte, leaf []byte) error, scheme string) *trie.Sync {
|
func NewStateSync(root common.Hash, database ethdb.Database, onLeaf func(keys [][]byte, leaf []byte) error, scheme string) *trie.Sync {
|
||||||
// Register the storage slot callback if the external callback is specified.
|
// Register the storage slot callback if the external callback is specified.
|
||||||
var onSlot func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error
|
var onSlot func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error
|
||||||
if onLeaf != nil {
|
if onLeaf != nil {
|
||||||
|
@ -268,7 +268,7 @@ func testIterativeStateSync(t *testing.T, count int, commit bool, bypath bool, s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := dstDb.NewBatch()
|
batch := dstDb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -369,7 +369,7 @@ func testIterativeDelayedStateSync(t *testing.T, scheme string) {
|
|||||||
nodeProcessed = len(nodeResults)
|
nodeProcessed = len(nodeResults)
|
||||||
}
|
}
|
||||||
batch := dstDb.NewBatch()
|
batch := dstDb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -469,7 +469,7 @@ func testIterativeRandomStateSync(t *testing.T, count int, scheme string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := dstDb.NewBatch()
|
batch := dstDb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -575,7 +575,7 @@ func testIterativeRandomDelayedStateSync(t *testing.T, scheme string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := dstDb.NewBatch()
|
batch := dstDb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -688,7 +688,7 @@ func testIncompleteStateSync(t *testing.T, scheme string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := dstDb.NewBatch()
|
batch := dstDb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
|
@ -558,8 +558,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
|
|||||||
} else {
|
} else {
|
||||||
d.ancientLimit = 0
|
d.ancientLimit = 0
|
||||||
}
|
}
|
||||||
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
|
frozen, _ := d.stateDB.BlockStore().Ancients() // Ignore the error here since light client can also hit here.
|
||||||
itemAmountInAncient, _ := d.stateDB.ItemAmountInAncient()
|
itemAmountInAncient, _ := d.stateDB.BlockStore().ItemAmountInAncient()
|
||||||
// If a part of blockchain data has already been written into active store,
|
// If a part of blockchain data has already been written into active store,
|
||||||
// disable the ancient style insertion explicitly.
|
// disable the ancient style insertion explicitly.
|
||||||
if origin >= frozen && itemAmountInAncient != 0 {
|
if origin >= frozen && itemAmountInAncient != 0 {
|
||||||
@ -1671,9 +1671,9 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
|
|||||||
}
|
}
|
||||||
// Don't report anything until we have a meaningful progress
|
// Don't report anything until we have a meaningful progress
|
||||||
var (
|
var (
|
||||||
headerBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerHeaderTable)
|
headerBytes, _ = d.stateDB.BlockStore().AncientSize(rawdb.ChainFreezerHeaderTable)
|
||||||
bodyBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerBodiesTable)
|
bodyBytes, _ = d.stateDB.BlockStore().AncientSize(rawdb.ChainFreezerBodiesTable)
|
||||||
receiptBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerReceiptTable)
|
receiptBytes, _ = d.stateDB.BlockStore().AncientSize(rawdb.ChainFreezerReceiptTable)
|
||||||
)
|
)
|
||||||
syncedBytes := common.StorageSize(headerBytes + bodyBytes + receiptBytes)
|
syncedBytes := common.StorageSize(headerBytes + bodyBytes + receiptBytes)
|
||||||
if syncedBytes == 0 {
|
if syncedBytes == 0 {
|
||||||
|
@ -409,7 +409,7 @@ type SyncPeer interface {
|
|||||||
// - The peer delivers a stale response after a previous timeout
|
// - The peer delivers a stale response after a previous timeout
|
||||||
// - The peer delivers a refusal to serve the requested state
|
// - The peer delivers a refusal to serve the requested state
|
||||||
type Syncer struct {
|
type Syncer struct {
|
||||||
db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup)
|
db ethdb.Database // Database to store the trie nodes into (and dedup)
|
||||||
scheme string // Node scheme used in node database
|
scheme string // Node scheme used in node database
|
||||||
|
|
||||||
root common.Hash // Current state trie root being synced
|
root common.Hash // Current state trie root being synced
|
||||||
@ -478,7 +478,7 @@ type Syncer struct {
|
|||||||
|
|
||||||
// NewSyncer creates a new snapshot syncer to download the Ethereum state over the
|
// NewSyncer creates a new snapshot syncer to download the Ethereum state over the
|
||||||
// snap protocol.
|
// snap protocol.
|
||||||
func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer {
|
func NewSyncer(db ethdb.Database, scheme string) *Syncer {
|
||||||
return &Syncer{
|
return &Syncer{
|
||||||
db: db,
|
db: db,
|
||||||
scheme: scheme,
|
scheme: scheme,
|
||||||
@ -719,11 +719,11 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
|
|||||||
|
|
||||||
// cleanPath is used to remove the dangling nodes in the stackTrie.
|
// cleanPath is used to remove the dangling nodes in the stackTrie.
|
||||||
func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) {
|
func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) {
|
||||||
if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db, path) {
|
if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db.StateStoreReader(), path) {
|
||||||
rawdb.DeleteAccountTrieNode(batch, path)
|
rawdb.DeleteAccountTrieNode(batch, path)
|
||||||
deletionGauge.Inc(1)
|
deletionGauge.Inc(1)
|
||||||
}
|
}
|
||||||
if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db, owner, path) {
|
if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db.StateStoreReader(), owner, path) {
|
||||||
rawdb.DeleteStorageTrieNode(batch, owner, path)
|
rawdb.DeleteStorageTrieNode(batch, owner, path)
|
||||||
deletionGauge.Inc(1)
|
deletionGauge.Inc(1)
|
||||||
}
|
}
|
||||||
@ -735,6 +735,7 @@ func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) {
|
|||||||
func (s *Syncer) loadSyncStatus() {
|
func (s *Syncer) loadSyncStatus() {
|
||||||
var progress SyncProgress
|
var progress SyncProgress
|
||||||
|
|
||||||
|
stateDiskDB := s.db.GetStateStore()
|
||||||
if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil {
|
if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil {
|
||||||
if err := json.Unmarshal(status, &progress); err != nil {
|
if err := json.Unmarshal(status, &progress); err != nil {
|
||||||
log.Error("Failed to decode snap sync status", "err", err)
|
log.Error("Failed to decode snap sync status", "err", err)
|
||||||
@ -747,7 +748,7 @@ func (s *Syncer) loadSyncStatus() {
|
|||||||
task := task // closure for task.genBatch in the stacktrie writer callback
|
task := task // closure for task.genBatch in the stacktrie writer callback
|
||||||
|
|
||||||
task.genBatch = ethdb.HookedBatch{
|
task.genBatch = ethdb.HookedBatch{
|
||||||
Batch: s.db.NewBatch(),
|
Batch: stateDiskDB.NewBatch(),
|
||||||
OnPut: func(key []byte, value []byte) {
|
OnPut: func(key []byte, value []byte) {
|
||||||
s.accountBytes += common.StorageSize(len(key) + len(value))
|
s.accountBytes += common.StorageSize(len(key) + len(value))
|
||||||
},
|
},
|
||||||
@ -773,7 +774,7 @@ func (s *Syncer) loadSyncStatus() {
|
|||||||
subtask := subtask // closure for subtask.genBatch in the stacktrie writer callback
|
subtask := subtask // closure for subtask.genBatch in the stacktrie writer callback
|
||||||
|
|
||||||
subtask.genBatch = ethdb.HookedBatch{
|
subtask.genBatch = ethdb.HookedBatch{
|
||||||
Batch: s.db.NewBatch(),
|
Batch: stateDiskDB.NewBatch(),
|
||||||
OnPut: func(key []byte, value []byte) {
|
OnPut: func(key []byte, value []byte) {
|
||||||
s.storageBytes += common.StorageSize(len(key) + len(value))
|
s.storageBytes += common.StorageSize(len(key) + len(value))
|
||||||
},
|
},
|
||||||
@ -841,7 +842,7 @@ func (s *Syncer) loadSyncStatus() {
|
|||||||
last = common.MaxHash
|
last = common.MaxHash
|
||||||
}
|
}
|
||||||
batch := ethdb.HookedBatch{
|
batch := ethdb.HookedBatch{
|
||||||
Batch: s.db.NewBatch(),
|
Batch: stateDiskDB.NewBatch(),
|
||||||
OnPut: func(key []byte, value []byte) {
|
OnPut: func(key []byte, value []byte) {
|
||||||
s.accountBytes += common.StorageSize(len(key) + len(value))
|
s.accountBytes += common.StorageSize(len(key) + len(value))
|
||||||
},
|
},
|
||||||
@ -1894,7 +1895,7 @@ func (s *Syncer) processAccountResponse(res *accountResponse) {
|
|||||||
}
|
}
|
||||||
// Check if the account is a contract with an unknown storage trie
|
// Check if the account is a contract with an unknown storage trie
|
||||||
if account.Root != types.EmptyRootHash {
|
if account.Root != types.EmptyRootHash {
|
||||||
if !rawdb.HasTrieNode(s.db, res.hashes[i], nil, account.Root, s.scheme) {
|
if !rawdb.HasTrieNode(s.db.StateStoreReader(), res.hashes[i], nil, account.Root, s.scheme) {
|
||||||
// If there was a previous large state retrieval in progress,
|
// If there was a previous large state retrieval in progress,
|
||||||
// don't restart it from scratch. This happens if a sync cycle
|
// don't restart it from scratch. This happens if a sync cycle
|
||||||
// is interrupted and resumed later. However, *do* update the
|
// is interrupted and resumed later. However, *do* update the
|
||||||
@ -1986,12 +1987,25 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
if res.subTask != nil {
|
if res.subTask != nil {
|
||||||
res.subTask.req = nil
|
res.subTask.req = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var usingMultDatabase bool
|
||||||
batch := ethdb.HookedBatch{
|
batch := ethdb.HookedBatch{
|
||||||
|
Batch: s.db.GetStateStore().NewBatch(),
|
||||||
|
OnPut: func(key []byte, value []byte) {
|
||||||
|
s.storageBytes += common.StorageSize(len(key) + len(value))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var snapBatch ethdb.HookedBatch
|
||||||
|
if s.db.StateStore() != nil {
|
||||||
|
usingMultDatabase = true
|
||||||
|
snapBatch = ethdb.HookedBatch{
|
||||||
Batch: s.db.NewBatch(),
|
Batch: s.db.NewBatch(),
|
||||||
OnPut: func(key []byte, value []byte) {
|
OnPut: func(key []byte, value []byte) {
|
||||||
s.storageBytes += common.StorageSize(len(key) + len(value))
|
s.storageBytes += common.StorageSize(len(key) + len(value))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
slots int
|
slots int
|
||||||
oldStorageBytes = s.storageBytes
|
oldStorageBytes = s.storageBytes
|
||||||
@ -2061,7 +2075,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
}
|
}
|
||||||
// Our first task is the one that was just filled by this response.
|
// Our first task is the one that was just filled by this response.
|
||||||
batch := ethdb.HookedBatch{
|
batch := ethdb.HookedBatch{
|
||||||
Batch: s.db.NewBatch(),
|
Batch: s.db.GetStateStore().NewBatch(),
|
||||||
OnPut: func(key []byte, value []byte) {
|
OnPut: func(key []byte, value []byte) {
|
||||||
s.storageBytes += common.StorageSize(len(key) + len(value))
|
s.storageBytes += common.StorageSize(len(key) + len(value))
|
||||||
},
|
},
|
||||||
@ -2088,7 +2102,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
})
|
})
|
||||||
for r.Next() {
|
for r.Next() {
|
||||||
batch := ethdb.HookedBatch{
|
batch := ethdb.HookedBatch{
|
||||||
Batch: s.db.NewBatch(),
|
Batch: s.db.GetStateStore().NewBatch(),
|
||||||
OnPut: func(key []byte, value []byte) {
|
OnPut: func(key []byte, value []byte) {
|
||||||
s.storageBytes += common.StorageSize(len(key) + len(value))
|
s.storageBytes += common.StorageSize(len(key) + len(value))
|
||||||
},
|
},
|
||||||
@ -2184,8 +2198,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
// outdated during the sync, but it can be fixed later during the
|
// outdated during the sync, but it can be fixed later during the
|
||||||
// snapshot generation.
|
// snapshot generation.
|
||||||
for j := 0; j < len(res.hashes[i]); j++ {
|
for j := 0; j < len(res.hashes[i]); j++ {
|
||||||
|
if usingMultDatabase {
|
||||||
|
rawdb.WriteStorageSnapshot(snapBatch, account, res.hashes[i][j], res.slots[i][j])
|
||||||
|
} else {
|
||||||
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
|
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
|
||||||
|
}
|
||||||
// If we're storing large contracts, generate the trie nodes
|
// If we're storing large contracts, generate the trie nodes
|
||||||
// on the fly to not trash the gluing points
|
// on the fly to not trash the gluing points
|
||||||
if i == len(res.hashes)-1 && res.subTask != nil {
|
if i == len(res.hashes)-1 && res.subTask != nil {
|
||||||
@ -2205,7 +2222,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
// If the chunk's root is an overflown but full delivery,
|
// If the chunk's root is an overflown but full delivery,
|
||||||
// clear the heal request.
|
// clear the heal request.
|
||||||
accountHash := res.accounts[len(res.accounts)-1]
|
accountHash := res.accounts[len(res.accounts)-1]
|
||||||
if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db, accountHash, nil, root) {
|
if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db.StateStoreReader(), accountHash, nil, root) {
|
||||||
for i, account := range res.mainTask.res.hashes {
|
for i, account := range res.mainTask.res.hashes {
|
||||||
if account == accountHash {
|
if account == accountHash {
|
||||||
res.mainTask.needHeal[i] = false
|
res.mainTask.needHeal[i] = false
|
||||||
@ -2225,6 +2242,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
|||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
log.Crit("Failed to persist storage slots", "err", err)
|
log.Crit("Failed to persist storage slots", "err", err)
|
||||||
}
|
}
|
||||||
|
if usingMultDatabase {
|
||||||
|
if err := snapBatch.Write(); err != nil {
|
||||||
|
log.Crit("Failed to persist storage slots", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
s.storageSynced += uint64(slots)
|
s.storageSynced += uint64(slots)
|
||||||
|
|
||||||
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)
|
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)
|
||||||
@ -2323,12 +2345,25 @@ func (s *Syncer) commitHealer(force bool) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
batch := s.db.NewBatch()
|
batch := s.db.NewBatch()
|
||||||
if err := s.healer.scheduler.Commit(batch); err != nil {
|
var stateBatch ethdb.Batch
|
||||||
|
var err error
|
||||||
|
if s.db.StateStore() != nil {
|
||||||
|
stateBatch = s.db.StateStore().NewBatch()
|
||||||
|
err = s.healer.scheduler.Commit(batch, stateBatch)
|
||||||
|
} else {
|
||||||
|
err = s.healer.scheduler.Commit(batch, nil)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
log.Error("Failed to commit healing data", "err", err)
|
log.Error("Failed to commit healing data", "err", err)
|
||||||
}
|
}
|
||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
log.Crit("Failed to persist healing data", "err", err)
|
log.Crit("Failed to persist healing data", "err", err)
|
||||||
}
|
}
|
||||||
|
if s.db.StateStore() != nil {
|
||||||
|
if err := stateBatch.Write(); err != nil {
|
||||||
|
log.Crit("Failed to persist healing data", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
|
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,12 +180,6 @@ type StateStoreReader interface {
|
|||||||
StateStoreReader() Reader
|
StateStoreReader() Reader
|
||||||
}
|
}
|
||||||
|
|
||||||
type BlockStore interface {
|
|
||||||
BlockStore() Database
|
|
||||||
SetBlockStore(block Database)
|
|
||||||
HasSeparateBlockStore() bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type BlockStoreReader interface {
|
type BlockStoreReader interface {
|
||||||
BlockStoreReader() Reader
|
BlockStoreReader() Reader
|
||||||
}
|
}
|
||||||
@ -194,6 +188,14 @@ type BlockStoreWriter interface {
|
|||||||
BlockStoreWriter() Writer
|
BlockStoreWriter() Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MultiDatabaseReader contains the methods required to read data from both key-value as well as
|
||||||
|
// blockStore or stateStore.
|
||||||
|
type MultiDatabaseReader interface {
|
||||||
|
KeyValueReader
|
||||||
|
StateStoreReader
|
||||||
|
BlockStoreReader
|
||||||
|
}
|
||||||
|
|
||||||
// Reader contains the methods required to read data from both key-value as well as
|
// Reader contains the methods required to read data from both key-value as well as
|
||||||
// immutable ancient data.
|
// immutable ancient data.
|
||||||
type Reader interface {
|
type Reader interface {
|
||||||
@ -234,6 +236,13 @@ type DiffStore interface {
|
|||||||
type StateStore interface {
|
type StateStore interface {
|
||||||
StateStore() Database
|
StateStore() Database
|
||||||
SetStateStore(state Database)
|
SetStateStore(state Database)
|
||||||
|
GetStateStore() Database
|
||||||
|
}
|
||||||
|
|
||||||
|
type BlockStore interface {
|
||||||
|
BlockStore() Database
|
||||||
|
SetBlockStore(block Database)
|
||||||
|
HasSeparateBlockStore() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Database contains all the methods required by the high level database to not
|
// Database contains all the methods required by the high level database to not
|
||||||
|
@ -39,6 +39,9 @@ var (
|
|||||||
// errSnapshotReleased is returned if callers want to retrieve data from a
|
// errSnapshotReleased is returned if callers want to retrieve data from a
|
||||||
// released snapshot.
|
// released snapshot.
|
||||||
errSnapshotReleased = errors.New("snapshot released")
|
errSnapshotReleased = errors.New("snapshot released")
|
||||||
|
|
||||||
|
// errNotSupported is returned if the database doesn't support the required operation.
|
||||||
|
errNotSupported = errors.New("this operation is not supported")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Database is an ephemeral key-value store. Apart from basic data storage
|
// Database is an ephemeral key-value store. Apart from basic data storage
|
||||||
@ -47,6 +50,84 @@ var (
|
|||||||
type Database struct {
|
type Database struct {
|
||||||
db map[string][]byte
|
db map[string][]byte
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
|
||||||
|
stateStore ethdb.Database
|
||||||
|
blockStore ethdb.Database
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) TruncateHead(n uint64) (uint64, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) TruncateTail(n uint64) (uint64, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) Sync() error {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) TruncateTableTail(kind string, tail uint64) (uint64, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) ResetTable(kind string, startAt uint64, onlyEmpty bool) error {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) HasAncient(kind string, number uint64) (bool, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) Ancient(kind string, number uint64) ([]byte, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) Ancients() (uint64, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) Tail() (uint64, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) AncientSize(kind string) (uint64, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) ItemAmountInAncient() (uint64, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) AncientOffSet() uint64 {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a wrapped map with all the required database interface methods
|
// New returns a wrapped map with all the required database interface methods
|
||||||
@ -204,6 +285,37 @@ func (db *Database) Len() int {
|
|||||||
return len(db.db)
|
return len(db.db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *Database) StateStoreReader() ethdb.Reader {
|
||||||
|
if db.stateStore == nil {
|
||||||
|
return db
|
||||||
|
}
|
||||||
|
return db.stateStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) BlockStoreReader() ethdb.Reader {
|
||||||
|
if db.blockStore == nil {
|
||||||
|
return db
|
||||||
|
}
|
||||||
|
return db.blockStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) BlockStoreWriter() ethdb.Writer {
|
||||||
|
if db.blockStore == nil {
|
||||||
|
return db
|
||||||
|
}
|
||||||
|
return db.blockStore
|
||||||
|
}
|
||||||
|
|
||||||
|
// convertLegacyFn takes a raw freezer entry in an older format and
|
||||||
|
// returns it in the new format.
|
||||||
|
type convertLegacyFn = func([]byte) ([]byte, error)
|
||||||
|
|
||||||
|
// MigrateTable processes the entries in a given table in sequence
|
||||||
|
// converting them to a new format if they're of an old format.
|
||||||
|
func (db *Database) MigrateTable(kind string, convert convertLegacyFn) error {
|
||||||
|
return errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
// keyvalue is a key-value tuple tagged with a deletion field to allow creating
|
// keyvalue is a key-value tuple tagged with a deletion field to allow creating
|
||||||
// memory-database write batches.
|
// memory-database write batches.
|
||||||
type keyvalue struct {
|
type keyvalue struct {
|
||||||
|
@ -122,6 +122,10 @@ func (db *Database) SetStateStore(state ethdb.Database) {
|
|||||||
panic("not supported")
|
panic("not supported")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *Database) GetStateStore() ethdb.Database {
|
||||||
|
panic("not supported")
|
||||||
|
}
|
||||||
|
|
||||||
func (db *Database) StateStoreReader() ethdb.Reader {
|
func (db *Database) StateStoreReader() ethdb.Reader {
|
||||||
return db
|
return db
|
||||||
}
|
}
|
||||||
|
@ -33,11 +33,11 @@ func (api *DebugAPI) DbGet(key string) (hexutil.Bytes, error) {
|
|||||||
// DbAncient retrieves an ancient binary blob from the append-only immutable files.
|
// DbAncient retrieves an ancient binary blob from the append-only immutable files.
|
||||||
// It is a mapping to the `AncientReaderOp.Ancient` method
|
// It is a mapping to the `AncientReaderOp.Ancient` method
|
||||||
func (api *DebugAPI) DbAncient(kind string, number uint64) (hexutil.Bytes, error) {
|
func (api *DebugAPI) DbAncient(kind string, number uint64) (hexutil.Bytes, error) {
|
||||||
return api.b.ChainDb().Ancient(kind, number)
|
return api.b.ChainDb().BlockStore().Ancient(kind, number)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DbAncients returns the ancient item numbers in the ancient store.
|
// DbAncients returns the ancient item numbers in the ancient store.
|
||||||
// It is a mapping to the `AncientReaderOp.Ancients` method
|
// It is a mapping to the `AncientReaderOp.Ancients` method
|
||||||
func (api *DebugAPI) DbAncients() (uint64, error) {
|
func (api *DebugAPI) DbAncients() (uint64, error) {
|
||||||
return api.b.ChainDb().Ancients()
|
return api.b.ChainDb().BlockStore().Ancients()
|
||||||
}
|
}
|
||||||
|
22
node/node.go
22
node/node.go
@ -78,7 +78,7 @@ const (
|
|||||||
blockDbHandlesMinSize = 1000
|
blockDbHandlesMinSize = 1000
|
||||||
blockDbHandlesMaxSize = 2000
|
blockDbHandlesMaxSize = 2000
|
||||||
chainDbMemoryPercentage = 50
|
chainDbMemoryPercentage = 50
|
||||||
chainDbHandlesPercentage
|
chainDbHandlesPercentage = 50
|
||||||
diffStoreHandlesPercentage = 20
|
diffStoreHandlesPercentage = 20
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -799,6 +799,7 @@ func (n *Node) OpenAndMergeDatabase(name string, namespace string, readonly bool
|
|||||||
diffStoreHandles int
|
diffStoreHandles int
|
||||||
chainDataHandles = config.DatabaseHandles
|
chainDataHandles = config.DatabaseHandles
|
||||||
chainDbCache = config.DatabaseCache
|
chainDbCache = config.DatabaseCache
|
||||||
|
stateDbCache, stateDbHandles int
|
||||||
)
|
)
|
||||||
|
|
||||||
if config.PersistDiff {
|
if config.PersistDiff {
|
||||||
@ -818,10 +819,17 @@ func (n *Node) OpenAndMergeDatabase(name string, namespace string, readonly bool
|
|||||||
} else {
|
} else {
|
||||||
blockDbHandlesSize = blockDbHandlesMinSize
|
blockDbHandlesSize = blockDbHandlesMinSize
|
||||||
}
|
}
|
||||||
stateDbCache := config.DatabaseCache - chainDbCache - blockDbCacheSize
|
stateDbCache = config.DatabaseCache - chainDbCache - blockDbCacheSize
|
||||||
stateDbHandles := config.DatabaseHandles - chainDataHandles - blockDbHandlesSize
|
stateDbHandles = config.DatabaseHandles - chainDataHandles - blockDbHandlesSize
|
||||||
disableChainDbFreeze = true
|
disableChainDbFreeze = true
|
||||||
|
}
|
||||||
|
|
||||||
|
chainDB, err := n.OpenDatabaseWithFreezer(name, chainDbCache, chainDataHandles, config.DatabaseFreezer, namespace, readonly, disableChainDbFreeze, false, config.PruneAncientData)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if isMultiDatabase {
|
||||||
// Allocate half of the handles and chainDbCache to this separate state data database
|
// Allocate half of the handles and chainDbCache to this separate state data database
|
||||||
stateDiskDb, err = n.OpenDatabaseWithFreezer(name+"/state", stateDbCache, stateDbHandles, "", "eth/db/statedata/", readonly, true, false, config.PruneAncientData)
|
stateDiskDb, err = n.OpenDatabaseWithFreezer(name+"/state", stateDbCache, stateDbHandles, "", "eth/db/statedata/", readonly, true, false, config.PruneAncientData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -833,14 +841,6 @@ func (n *Node) OpenAndMergeDatabase(name string, namespace string, readonly bool
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
log.Warn("Multi-database is an experimental feature")
|
log.Warn("Multi-database is an experimental feature")
|
||||||
}
|
|
||||||
|
|
||||||
chainDB, err := n.OpenDatabaseWithFreezer(name, chainDbCache, chainDataHandles, config.DatabaseFreezer, namespace, readonly, disableChainDbFreeze, false, config.PruneAncientData)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if isMultiDatabase {
|
|
||||||
chainDB.SetStateStore(stateDiskDb)
|
chainDB.SetStateStore(stateDiskDb)
|
||||||
chainDB.SetBlockStore(blockDb)
|
chainDB.SetBlockStore(blockDb)
|
||||||
}
|
}
|
||||||
|
28
trie/sync.go
28
trie/sync.go
@ -229,7 +229,7 @@ func (batch *syncMemBatch) delNode(owner common.Hash, path []byte) {
|
|||||||
// and reconstructs the trie step by step until all is done.
|
// and reconstructs the trie step by step until all is done.
|
||||||
type Sync struct {
|
type Sync struct {
|
||||||
scheme string // Node scheme descriptor used in database.
|
scheme string // Node scheme descriptor used in database.
|
||||||
database ethdb.KeyValueReader // Persistent database to check for existing entries
|
database ethdb.Database // Persistent database to check for existing entries
|
||||||
membatch *syncMemBatch // Memory buffer to avoid frequent database writes
|
membatch *syncMemBatch // Memory buffer to avoid frequent database writes
|
||||||
nodeReqs map[string]*nodeRequest // Pending requests pertaining to a trie node path
|
nodeReqs map[string]*nodeRequest // Pending requests pertaining to a trie node path
|
||||||
codeReqs map[common.Hash]*codeRequest // Pending requests pertaining to a code hash
|
codeReqs map[common.Hash]*codeRequest // Pending requests pertaining to a code hash
|
||||||
@ -238,7 +238,7 @@ type Sync struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewSync creates a new trie data download scheduler.
|
// NewSync creates a new trie data download scheduler.
|
||||||
func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallback, scheme string) *Sync {
|
func NewSync(root common.Hash, database ethdb.Database, callback LeafCallback, scheme string) *Sync {
|
||||||
ts := &Sync{
|
ts := &Sync{
|
||||||
scheme: scheme,
|
scheme: scheme,
|
||||||
database: database,
|
database: database,
|
||||||
@ -420,7 +420,7 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error {
|
|||||||
// Commit flushes the data stored in the internal membatch out to persistent
|
// Commit flushes the data stored in the internal membatch out to persistent
|
||||||
// storage, returning any occurred error. The whole data set will be flushed
|
// storage, returning any occurred error. The whole data set will be flushed
|
||||||
// in an atomic database batch.
|
// in an atomic database batch.
|
||||||
func (s *Sync) Commit(dbw ethdb.Batch) error {
|
func (s *Sync) Commit(dbw ethdb.Batch, stateBatch ethdb.Batch) error {
|
||||||
// Flush the pending node writes into database batch.
|
// Flush the pending node writes into database batch.
|
||||||
var (
|
var (
|
||||||
account int
|
account int
|
||||||
@ -430,10 +430,18 @@ func (s *Sync) Commit(dbw ethdb.Batch) error {
|
|||||||
if op.isDelete() {
|
if op.isDelete() {
|
||||||
// node deletion is only supported in path mode.
|
// node deletion is only supported in path mode.
|
||||||
if op.owner == (common.Hash{}) {
|
if op.owner == (common.Hash{}) {
|
||||||
|
if stateBatch != nil {
|
||||||
|
rawdb.DeleteAccountTrieNode(stateBatch, op.path)
|
||||||
|
} else {
|
||||||
rawdb.DeleteAccountTrieNode(dbw, op.path)
|
rawdb.DeleteAccountTrieNode(dbw, op.path)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if stateBatch != nil {
|
||||||
|
rawdb.DeleteStorageTrieNode(stateBatch, op.owner, op.path)
|
||||||
} else {
|
} else {
|
||||||
rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path)
|
rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
deletionGauge.Inc(1)
|
deletionGauge.Inc(1)
|
||||||
} else {
|
} else {
|
||||||
if op.owner == (common.Hash{}) {
|
if op.owner == (common.Hash{}) {
|
||||||
@ -441,9 +449,13 @@ func (s *Sync) Commit(dbw ethdb.Batch) error {
|
|||||||
} else {
|
} else {
|
||||||
storage += 1
|
storage += 1
|
||||||
}
|
}
|
||||||
|
if stateBatch != nil {
|
||||||
|
rawdb.WriteTrieNode(stateBatch, op.owner, op.path, op.hash, op.blob, s.scheme)
|
||||||
|
} else {
|
||||||
rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme)
|
rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
accountNodeSyncedGauge.Inc(int64(account))
|
accountNodeSyncedGauge.Inc(int64(account))
|
||||||
storageNodeSyncedGauge.Inc(int64(storage))
|
storageNodeSyncedGauge.Inc(int64(storage))
|
||||||
|
|
||||||
@ -546,9 +558,9 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
|
|||||||
// the performance impact negligible.
|
// the performance impact negligible.
|
||||||
var exists bool
|
var exists bool
|
||||||
if owner == (common.Hash{}) {
|
if owner == (common.Hash{}) {
|
||||||
exists = rawdb.ExistsAccountTrieNode(s.database, append(inner, key[:i]...))
|
exists = rawdb.ExistsAccountTrieNode(s.database.StateStoreReader(), append(inner, key[:i]...))
|
||||||
} else {
|
} else {
|
||||||
exists = rawdb.ExistsStorageTrieNode(s.database, owner, append(inner, key[:i]...))
|
exists = rawdb.ExistsStorageTrieNode(s.database.StateStoreReader(), owner, append(inner, key[:i]...))
|
||||||
}
|
}
|
||||||
if exists {
|
if exists {
|
||||||
s.membatch.delNode(owner, append(inner, key[:i]...))
|
s.membatch.delNode(owner, append(inner, key[:i]...))
|
||||||
@ -687,15 +699,15 @@ func (s *Sync) commitCodeRequest(req *codeRequest) error {
|
|||||||
func (s *Sync) hasNode(owner common.Hash, path []byte, hash common.Hash) (exists bool, inconsistent bool) {
|
func (s *Sync) hasNode(owner common.Hash, path []byte, hash common.Hash) (exists bool, inconsistent bool) {
|
||||||
// If node is running with hash scheme, check the presence with node hash.
|
// If node is running with hash scheme, check the presence with node hash.
|
||||||
if s.scheme == rawdb.HashScheme {
|
if s.scheme == rawdb.HashScheme {
|
||||||
return rawdb.HasLegacyTrieNode(s.database, hash), false
|
return rawdb.HasLegacyTrieNode(s.database.StateStoreReader(), hash), false
|
||||||
}
|
}
|
||||||
// If node is running with path scheme, check the presence with node path.
|
// If node is running with path scheme, check the presence with node path.
|
||||||
var blob []byte
|
var blob []byte
|
||||||
var dbHash common.Hash
|
var dbHash common.Hash
|
||||||
if owner == (common.Hash{}) {
|
if owner == (common.Hash{}) {
|
||||||
blob, dbHash = rawdb.ReadAccountTrieNode(s.database, path)
|
blob, dbHash = rawdb.ReadAccountTrieNode(s.database.StateStoreReader(), path)
|
||||||
} else {
|
} else {
|
||||||
blob, dbHash = rawdb.ReadStorageTrieNode(s.database, owner, path)
|
blob, dbHash = rawdb.ReadStorageTrieNode(s.database.StateStoreReader(), owner, path)
|
||||||
}
|
}
|
||||||
exists = hash == dbHash
|
exists = hash == dbHash
|
||||||
inconsistent = !exists && len(blob) != 0
|
inconsistent = !exists && len(blob) != 0
|
||||||
|
@ -27,7 +27,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/ethdb/memorydb"
|
|
||||||
"github.com/ethereum/go-ethereum/trie/trienode"
|
"github.com/ethereum/go-ethereum/trie/trienode"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -143,7 +142,7 @@ func TestEmptySync(t *testing.T) {
|
|||||||
emptyD, _ := New(TrieID(types.EmptyRootHash), dbD)
|
emptyD, _ := New(TrieID(types.EmptyRootHash), dbD)
|
||||||
|
|
||||||
for i, trie := range []*Trie{emptyA, emptyB, emptyC, emptyD} {
|
for i, trie := range []*Trie{emptyA, emptyB, emptyC, emptyD} {
|
||||||
sync := NewSync(trie.Hash(), memorydb.New(), nil, []*testDb{dbA, dbB, dbC, dbD}[i].Scheme())
|
sync := NewSync(trie.Hash(), rawdb.NewMemoryDatabase(), nil, []*testDb{dbA, dbB, dbC, dbD}[i].Scheme())
|
||||||
if paths, nodes, codes := sync.Missing(1); len(paths) != 0 || len(nodes) != 0 || len(codes) != 0 {
|
if paths, nodes, codes := sync.Missing(1); len(paths) != 0 || len(nodes) != 0 || len(codes) != 0 {
|
||||||
t.Errorf("test %d: content requested for empty trie: %v, %v, %v", i, paths, nodes, codes)
|
t.Errorf("test %d: content requested for empty trie: %v, %v, %v", i, paths, nodes, codes)
|
||||||
}
|
}
|
||||||
@ -212,7 +211,7 @@ func testIterativeSync(t *testing.T, count int, bypath bool, scheme string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := diskdb.NewBatch()
|
batch := diskdb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -278,7 +277,7 @@ func testIterativeDelayedSync(t *testing.T, scheme string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := diskdb.NewBatch()
|
batch := diskdb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -348,7 +347,7 @@ func testIterativeRandomSync(t *testing.T, count int, scheme string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := diskdb.NewBatch()
|
batch := diskdb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -419,7 +418,7 @@ func testIterativeRandomDelayedSync(t *testing.T, scheme string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := diskdb.NewBatch()
|
batch := diskdb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -491,7 +490,7 @@ func testDuplicateAvoidanceSync(t *testing.T, scheme string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := diskdb.NewBatch()
|
batch := diskdb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -563,7 +562,7 @@ func testIncompleteSync(t *testing.T, scheme string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := diskdb.NewBatch()
|
batch := diskdb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -653,7 +652,7 @@ func testSyncOrdering(t *testing.T, scheme string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := diskdb.NewBatch()
|
batch := diskdb.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
batch.Write()
|
batch.Write()
|
||||||
@ -723,7 +722,7 @@ func syncWithHookWriter(t *testing.T, root common.Hash, db ethdb.Database, srcDb
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
batch := db.NewBatch()
|
batch := db.NewBatch()
|
||||||
if err := sched.Commit(batch); err != nil {
|
if err := sched.Commit(batch, nil); err != nil {
|
||||||
t.Fatalf("failed to commit data: %v", err)
|
t.Fatalf("failed to commit data: %v", err)
|
||||||
}
|
}
|
||||||
if hookWriter != nil {
|
if hookWriter != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user