From 87e622e51fab01204f6d57712f9d86192e7a9ac5 Mon Sep 17 00:00:00 2001 From: Chris Li Date: Tue, 16 Jul 2024 22:37:03 +0800 Subject: [PATCH] fix: the bug of blobsidecars and downloader with multi-database (#2564) --- cmd/geth/chaincmd.go | 3 +- cmd/geth/dbcmd.go | 12 ++-- cmd/geth/main.go | 1 + cmd/utils/flags.go | 1 - core/blockchain.go | 30 ++++----- core/blockchain_reader.go | 4 +- core/chain_indexer.go | 4 +- core/headerchain.go | 8 +-- core/rawdb/accessors_chain.go | 61 +++++++++-------- core/rawdb/accessors_indexes.go | 2 +- core/rawdb/chain_freezer.go | 6 +- core/rawdb/chain_iterator.go | 16 ++--- core/rawdb/database.go | 78 +++++++++++----------- core/rawdb/table.go | 4 ++ core/state/sync.go | 2 +- core/state/sync_test.go | 10 +-- eth/downloader/downloader.go | 10 +-- eth/protocols/snap/sync.go | 67 ++++++++++++++----- ethdb/database.go | 21 ++++-- ethdb/memorydb/memorydb.go | 112 ++++++++++++++++++++++++++++++++ ethdb/remotedb/remotedb.go | 4 ++ internal/ethapi/dbapi.go | 4 +- node/node.go | 46 ++++++------- trie/sync.go | 34 ++++++---- trie/sync_test.go | 19 +++--- 25 files changed, 371 insertions(+), 188 deletions(-) diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 27bc72e4d..afcb47484 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -64,6 +64,7 @@ var ( utils.CachePreimagesFlag, utils.OverrideBohr, utils.OverrideVerkle, + utils.MultiDataBaseFlag, }, utils.DatabaseFlags), Description: ` The init command initializes a new genesis block and definition for the network. @@ -759,7 +760,7 @@ func parseDumpConfig(ctx *cli.Context, stack *node.Node) (*state.DumpConfig, eth arg := ctx.Args().First() if hashish(arg) { hash := common.HexToHash(arg) - if number := rawdb.ReadHeaderNumber(db.BlockStore(), hash); number != nil { + if number := rawdb.ReadHeaderNumber(db, hash); number != nil { header = rawdb.ReadHeader(db, hash, *number) } else { return nil, nil, common.Hash{}, fmt.Errorf("block %x not found", hash) diff --git a/cmd/geth/dbcmd.go b/cmd/geth/dbcmd.go index 0b2017eac..282f035bb 100644 --- a/cmd/geth/dbcmd.go +++ b/cmd/geth/dbcmd.go @@ -397,8 +397,8 @@ func inspectTrie(ctx *cli.Context) error { var headerBlockHash common.Hash if ctx.NArg() >= 1 { if ctx.Args().Get(0) == "latest" { - headerHash := rawdb.ReadHeadHeaderHash(db.BlockStore()) - blockNumber = *(rawdb.ReadHeaderNumber(db.BlockStore(), headerHash)) + headerHash := rawdb.ReadHeadHeaderHash(db) + blockNumber = *(rawdb.ReadHeaderNumber(db, headerHash)) } else if ctx.Args().Get(0) == "snapshot" { trieRootHash = rawdb.ReadSnapshotRoot(db) blockNumber = math.MaxUint64 @@ -1212,7 +1212,7 @@ func showMetaData(ctx *cli.Context) error { if err != nil { fmt.Fprintf(os.Stderr, "Error accessing ancients: %v", err) } - data := rawdb.ReadChainMetadataFromMultiDatabase(db) + data := rawdb.ReadChainMetadata(db) data = append(data, []string{"frozen", fmt.Sprintf("%d items", ancients)}) data = append(data, []string{"snapshotGenerator", snapshot.ParseGeneratorStatus(rawdb.ReadSnapshotGenerator(db))}) if b := rawdb.ReadHeadBlock(db); b != nil { @@ -1255,7 +1255,7 @@ func hbss2pbss(ctx *cli.Context) error { defer stack.Close() db := utils.MakeChainDatabase(ctx, stack, false, false) - db.Sync() + db.BlockStore().Sync() stateDiskDb := db.StateStore() defer db.Close() @@ -1273,8 +1273,8 @@ func hbss2pbss(ctx *cli.Context) error { log.Info("hbss2pbss triedb", "scheme", triedb.Scheme()) defer triedb.Close() - headerHash := rawdb.ReadHeadHeaderHash(db.BlockStore()) - blockNumber := rawdb.ReadHeaderNumber(db.BlockStore(), headerHash) + headerHash := rawdb.ReadHeadHeaderHash(db) + blockNumber := rawdb.ReadHeaderNumber(db, headerHash) if blockNumber == nil { log.Error("read header number failed.") return fmt.Errorf("read header number failed") diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 47ba3c680..a79ef422f 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -125,6 +125,7 @@ var ( utils.CacheSnapshotFlag, // utils.CacheNoPrefetchFlag, utils.CachePreimagesFlag, + utils.MultiDataBaseFlag, utils.PersistDiffFlag, utils.DiffBlockFlag, utils.PruneAncientDataFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index d6c1dd368..62b3b2f39 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1153,7 +1153,6 @@ var ( DBEngineFlag, StateSchemeFlag, HttpHeaderFlag, - MultiDataBaseFlag, } ) diff --git a/core/blockchain.go b/core/blockchain.go index 7f905bf55..a91b14917 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -462,8 +462,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis } } // Ensure that a previous crash in SetHead doesn't leave extra ancients - if frozen, err := bc.db.ItemAmountInAncient(); err == nil && frozen > 0 { - frozen, err = bc.db.Ancients() + if frozen, err := bc.db.BlockStore().ItemAmountInAncient(); err == nil && frozen > 0 { + frozen, err = bc.db.BlockStore().Ancients() if err != nil { return nil, err } @@ -663,7 +663,7 @@ func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer, diffLayerCh cha // into node seamlessly. func (bc *BlockChain) empty() bool { genesis := bc.genesisBlock.Hash() - for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db.BlockStore()), rawdb.ReadHeadHeaderHash(bc.db.BlockStore()), rawdb.ReadHeadFastBlockHash(bc.db.BlockStore())} { + for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db), rawdb.ReadHeadHeaderHash(bc.db), rawdb.ReadHeadFastBlockHash(bc.db)} { if hash != genesis { return false } @@ -699,7 +699,7 @@ func (bc *BlockChain) getFinalizedNumber(header *types.Header) uint64 { // assumes that the chain manager mutex is held. func (bc *BlockChain) loadLastState() error { // Restore the last known head block - head := rawdb.ReadHeadBlockHash(bc.db.BlockStore()) + head := rawdb.ReadHeadBlockHash(bc.db) if head == (common.Hash{}) { // Corrupt or empty database, init from scratch log.Warn("Empty database, resetting chain") @@ -721,7 +721,7 @@ func (bc *BlockChain) loadLastState() error { // Restore the last known head header headHeader := headBlock.Header() - if head := rawdb.ReadHeadHeaderHash(bc.db.BlockStore()); head != (common.Hash{}) { + if head := rawdb.ReadHeadHeaderHash(bc.db); head != (common.Hash{}) { if header := bc.GetHeaderByHash(head); header != nil { headHeader = header } @@ -732,7 +732,7 @@ func (bc *BlockChain) loadLastState() error { bc.currentSnapBlock.Store(headBlock.Header()) headFastBlockGauge.Update(int64(headBlock.NumberU64())) - if head := rawdb.ReadHeadFastBlockHash(bc.db.BlockStore()); head != (common.Hash{}) { + if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) { if block := bc.GetBlockByHash(head); block != nil { bc.currentSnapBlock.Store(block.Header()) headFastBlockGauge.Update(int64(block.NumberU64())) @@ -1100,7 +1100,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // intent afterwards is full block importing, delete the chain segment // between the stateful-block and the sethead target. var wipe bool - frozen, _ := bc.db.Ancients() + frozen, _ := bc.db.BlockStore().Ancients() if headNumber+1 < frozen { wipe = pivot == nil || headNumber >= *pivot } @@ -1109,11 +1109,11 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // Rewind the header chain, deleting all block bodies until then delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { // Ignore the error here since light client won't hit this path - frozen, _ := bc.db.Ancients() + frozen, _ := bc.db.BlockStore().Ancients() if num+1 <= frozen { // Truncate all relative data(header, total difficulty, body, receipt // and canonical hash) from ancient store. - if _, err := bc.db.TruncateHead(num); err != nil { + if _, err := bc.db.BlockStore().TruncateHead(num); err != nil { log.Crit("Failed to truncate ancient data", "number", num, "err", err) } // Remove the hash <-> number mapping from the active store. @@ -1556,9 +1556,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Ensure genesis is in ancients. if first.NumberU64() == 1 { - if frozen, _ := bc.db.Ancients(); frozen == 0 { + if frozen, _ := bc.db.BlockStore().Ancients(); frozen == 0 { td := bc.genesisBlock.Difficulty() - writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td) + writeSize, err := rawdb.WriteAncientBlocks(bc.db.BlockStore(), []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td) if err != nil { log.Error("Error writing genesis to ancients", "err", err) return 0, err @@ -1576,7 +1576,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Write all chain data to ancients. td := bc.GetTd(first.Hash(), first.NumberU64()) - writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db, blockChain, receiptChain, td) + writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db.BlockStore(), blockChain, receiptChain, td) if err != nil { log.Error("Error importing chain data to ancients", "err", err) return 0, err @@ -1584,7 +1584,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ size += writeSize // Sync the ancient store explicitly to ensure all data has been flushed to disk. - if err := bc.db.Sync(); err != nil { + if err := bc.db.BlockStore().Sync(); err != nil { return 0, err } // Update the current snap block because all block data is now present in DB. @@ -1592,7 +1592,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if !updateHead(blockChain[len(blockChain)-1]) { // We end up here if the header chain has reorg'ed, and the blocks/receipts // don't match the canonical chain. - if _, err := bc.db.TruncateHead(previousSnapBlock + 1); err != nil { + if _, err := bc.db.BlockStore().TruncateHead(previousSnapBlock + 1); err != nil { log.Error("Can't truncate ancient store after failed insert", "err", err) } return 0, errSideChainReceipts @@ -1612,7 +1612,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ rawdb.DeleteBlockWithoutNumber(blockBatch, block.Hash(), block.NumberU64()) } // Delete side chain hash-to-number mappings. - for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) { + for _, nh := range rawdb.ReadAllHashesInRange(bc.db.BlockStore(), first.NumberU64(), last.NumberU64()) { if _, canon := canonHashes[nh.Hash]; !canon { rawdb.DeleteHeader(blockBatch, nh.Hash, nh.Number) } diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index d440590b8..84bbbc25f 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -231,7 +231,7 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { if receipts, ok := bc.receiptsCache.Get(hash); ok { return receipts } - number := rawdb.ReadHeaderNumber(bc.db.BlockStore(), hash) + number := rawdb.ReadHeaderNumber(bc.db, hash) if number == nil { return nil } @@ -514,7 +514,7 @@ func (bc *BlockChain) SubscribeFinalizedHeaderEvent(ch chan<- FinalizedHeaderEve // AncientTail retrieves the tail the ancients blocks func (bc *BlockChain) AncientTail() (uint64, error) { - tail, err := bc.db.Tail() + tail, err := bc.db.BlockStore().Tail() if err != nil { return 0, err } diff --git a/core/chain_indexer.go b/core/chain_indexer.go index 6ccca69e5..f5fce7258 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -227,8 +227,8 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainH // Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then) // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly? - if rawdb.ReadCanonicalHash(c.chainDb.BlockStore(), prevHeader.Number.Uint64()) != prevHash { - if h := rawdb.FindCommonAncestor(c.chainDb.BlockStore(), prevHeader, header); h != nil { + if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash { + if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil { c.newHead(h.Number.Uint64(), true) } } diff --git a/core/headerchain.go b/core/headerchain.go index f09ab4347..f2cc0b698 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -97,7 +97,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c return nil, ErrNoGenesis } hc.currentHeader.Store(hc.genesisHeader) - if head := rawdb.ReadHeadBlockHash(chainDb.BlockStore()); head != (common.Hash{}) { + if head := rawdb.ReadHeadBlockHash(chainDb); head != (common.Hash{}) { if chead := hc.GetHeaderByHash(head); chead != nil { hc.currentHeader.Store(chead) } @@ -144,7 +144,7 @@ func (hc *HeaderChain) GetBlockNumber(hash common.Hash) *uint64 { if cached, ok := hc.numberCache.Get(hash); ok { return &cached } - number := rawdb.ReadHeaderNumber(hc.chainDb.BlockStore(), hash) + number := rawdb.ReadHeaderNumber(hc.chainDb, hash) if number != nil { hc.numberCache.Add(hash, *number) } @@ -691,7 +691,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat // we don't end up with dangling daps in the database var nums []uint64 if origin { - for n := num + 1; len(rawdb.ReadAllHashes(hc.chainDb, n)) > 0; n++ { + for n := num + 1; len(rawdb.ReadAllHashes(hc.chainDb.BlockStore(), n)) > 0; n++ { nums = append([]uint64{n}, nums...) // suboptimal, but we don't really expect this path } origin = false @@ -701,7 +701,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat // Remove the related data from the database on all sidechains for _, num := range nums { // Gather all the side fork hashes - hashes := rawdb.ReadAllHashes(hc.chainDb, num) + hashes := rawdb.ReadAllHashes(hc.chainDb.BlockStore(), num) if len(hashes) == 0 { // No hashes in the database whatsoever, probably frozen already hashes = append(hashes, hdr.Hash()) diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index a96376d34..af3cb01ce 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -34,6 +34,15 @@ import ( "golang.org/x/exp/slices" ) +// Support Multi-Database Based on Data Pattern, the Chaindata will be divided into three stores: BlockStore, StateStore, and ChainStore, +// according to data schema and read/write behavior. When using the following data interfaces, you should take note of the following: +// +// 1) Block-Related Data: For CanonicalHash, Header, Body, Td, Receipts, and BlobSidecars, the Write, Delete, and Iterator +// operations should carefully ensure that the database being used is BlockStore. +// 2) Meta-Related Data: For HeaderNumber, HeadHeaderHash, HeadBlockHash, HeadFastBlockHash, and FinalizedBlockHash, the +// Write and Delete operations should carefully ensure that the database being used is BlockStore. +// 3) Ancient Data: When using a multi-database, Ancient data will use the BlockStore. + // ReadCanonicalHash retrieves the hash assigned to a canonical block number. func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash { var data []byte @@ -144,8 +153,8 @@ func ReadAllCanonicalHashes(db ethdb.Iteratee, from uint64, to uint64, limit int } // ReadHeaderNumber returns the header number assigned to a hash. -func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { - data, _ := db.Get(headerNumberKey(hash)) +func ReadHeaderNumber(db ethdb.MultiDatabaseReader, hash common.Hash) *uint64 { + data, _ := db.BlockStoreReader().Get(headerNumberKey(hash)) if len(data) != 8 { return nil } @@ -170,8 +179,8 @@ func DeleteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash) { } // ReadHeadHeaderHash retrieves the hash of the current canonical head header. -func ReadHeadHeaderHash(db ethdb.KeyValueReader) common.Hash { - data, _ := db.Get(headHeaderKey) +func ReadHeadHeaderHash(db ethdb.MultiDatabaseReader) common.Hash { + data, _ := db.BlockStoreReader().Get(headHeaderKey) if len(data) == 0 { return common.Hash{} } @@ -186,8 +195,8 @@ func WriteHeadHeaderHash(db ethdb.KeyValueWriter, hash common.Hash) { } // ReadHeadBlockHash retrieves the hash of the current canonical head block. -func ReadHeadBlockHash(db ethdb.KeyValueReader) common.Hash { - data, _ := db.Get(headBlockKey) +func ReadHeadBlockHash(db ethdb.MultiDatabaseReader) common.Hash { + data, _ := db.BlockStoreReader().Get(headBlockKey) if len(data) == 0 { return common.Hash{} } @@ -202,8 +211,8 @@ func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { } // ReadHeadFastBlockHash retrieves the hash of the current fast-sync head block. -func ReadHeadFastBlockHash(db ethdb.KeyValueReader) common.Hash { - data, _ := db.Get(headFastBlockKey) +func ReadHeadFastBlockHash(db ethdb.MultiDatabaseReader) common.Hash { + data, _ := db.BlockStoreReader().Get(headFastBlockKey) if len(data) == 0 { return common.Hash{} } @@ -218,8 +227,8 @@ func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { } // ReadFinalizedBlockHash retrieves the hash of the finalized block. -func ReadFinalizedBlockHash(db ethdb.KeyValueReader) common.Hash { - data, _ := db.Get(headFinalizedBlockKey) +func ReadFinalizedBlockHash(db ethdb.MultiDatabaseReader) common.Hash { + data, _ := db.BlockStoreReader().Get(headFinalizedBlockKey) if len(data) == 0 { return common.Hash{} } @@ -297,7 +306,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu // It's ok to request block 0, 1 item count = number + 1 } - limit, _ := db.Ancients() + limit, _ := db.BlockStoreReader().Ancients() // First read live blocks if i >= limit { // If we need to read live blocks, we need to figure out the hash first @@ -317,7 +326,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu return rlpHeaders } // read remaining from ancients, cap at 2M - data, err := db.AncientRange(ChainFreezerHeaderTable, i+1-count, count, 2*1024*1024) + data, err := db.BlockStoreReader().AncientRange(ChainFreezerHeaderTable, i+1-count, count, 2*1024*1024) if err != nil { log.Error("Failed to read headers from freezer", "err", err) return rlpHeaders @@ -468,7 +477,7 @@ func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue { // Block is not in ancients, read from leveldb by hash and number. // Note: ReadCanonicalHash cannot be used here because it also // calls ReadAncients internally. - hash, _ := db.Get(headerHashKey(number)) + hash, _ := db.BlockStoreReader().Get(headerHashKey(number)) data, _ = db.BlockStoreReader().Get(blockBodyKey(number, common.BytesToHash(hash))) return nil }) @@ -516,6 +525,13 @@ func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *t WriteBodyRLP(db, hash, number, data) } +// DeleteBody removes all block body data associated with a hash. +func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { + if err := db.Delete(blockBodyKey(number, hash)); err != nil { + log.Crit("Failed to delete block body", "err", err) + } +} + func WriteDiffLayer(db ethdb.KeyValueWriter, hash common.Hash, layer *types.DiffLayer) { data, err := rlp.EncodeToBytes(layer) if err != nil { @@ -554,13 +570,6 @@ func DeleteDiffLayer(db ethdb.KeyValueWriter, blockHash common.Hash) { } } -// DeleteBody removes all block body data associated with a hash. -func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { - if err := db.Delete(blockBodyKey(number, hash)); err != nil { - log.Crit("Failed to delete block body", "err", err) - } -} - // ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding. func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { var data []byte @@ -884,7 +893,7 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts // ReadBlobSidecarsRLP retrieves all the transaction blobs belonging to a block in RLP encoding. func ReadBlobSidecarsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { var data []byte - db.ReadAncients(func(reader ethdb.AncientReaderOp) error { + db.BlockStoreReader().ReadAncients(func(reader ethdb.AncientReaderOp) error { // Check if the data is in ancients if isCanon(reader, number, hash) { data, _ = reader.Ancient(ChainFreezerBlobSidecarTable, number) @@ -1093,24 +1102,24 @@ func FindCommonAncestor(db ethdb.Reader, a, b *types.Header) *types.Header { // ReadHeadHeader returns the current canonical head header. func ReadHeadHeader(db ethdb.Reader) *types.Header { - headHeaderHash := ReadHeadHeaderHash(db.BlockStoreReader()) + headHeaderHash := ReadHeadHeaderHash(db) if headHeaderHash == (common.Hash{}) { return nil } - headHeaderNumber := ReadHeaderNumber(db.BlockStoreReader(), headHeaderHash) + headHeaderNumber := ReadHeaderNumber(db, headHeaderHash) if headHeaderNumber == nil { return nil } - return ReadHeader(db.BlockStoreReader(), headHeaderHash, *headHeaderNumber) + return ReadHeader(db, headHeaderHash, *headHeaderNumber) } // ReadHeadBlock returns the current canonical head block. func ReadHeadBlock(db ethdb.Reader) *types.Block { - headBlockHash := ReadHeadBlockHash(db.BlockStoreReader()) + headBlockHash := ReadHeadBlockHash(db) if headBlockHash == (common.Hash{}) { return nil } - headBlockNumber := ReadHeaderNumber(db.BlockStoreReader(), headBlockHash) + headBlockNumber := ReadHeaderNumber(db, headBlockHash) if headBlockNumber == nil { return nil } diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index 95fc1d780..4f2ef0a88 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -42,7 +42,7 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 { } // Database v4-v5 tx lookup format just stores the hash if len(data) == common.HashLength { - return ReadHeaderNumber(db.BlockStoreReader(), common.BytesToHash(data)) + return ReadHeaderNumber(db, common.BytesToHash(data)) } // Finally try database v3 tx lookup format var entry LegacyTxLookupEntry diff --git a/core/rawdb/chain_freezer.go b/core/rawdb/chain_freezer.go index ac7a80905..3a1828110 100644 --- a/core/rawdb/chain_freezer.go +++ b/core/rawdb/chain_freezer.go @@ -92,7 +92,7 @@ func (f *chainFreezer) Close() error { // readHeadNumber returns the number of chain head block. 0 is returned if the // block is unknown or not available yet. -func (f *chainFreezer) readHeadNumber(db ethdb.KeyValueReader) uint64 { +func (f *chainFreezer) readHeadNumber(db ethdb.Reader) uint64 { hash := ReadHeadBlockHash(db) if hash == (common.Hash{}) { log.Error("Head block is not reachable") @@ -108,7 +108,7 @@ func (f *chainFreezer) readHeadNumber(db ethdb.KeyValueReader) uint64 { // readFinalizedNumber returns the number of finalized block. 0 is returned // if the block is unknown or not available yet. -func (f *chainFreezer) readFinalizedNumber(db ethdb.KeyValueReader) uint64 { +func (f *chainFreezer) readFinalizedNumber(db ethdb.Reader) uint64 { hash := ReadFinalizedBlockHash(db) if hash == (common.Hash{}) { return 0 @@ -123,7 +123,7 @@ func (f *chainFreezer) readFinalizedNumber(db ethdb.KeyValueReader) uint64 { // freezeThreshold returns the threshold for chain freezing. It's determined // by formula: max(finality, HEAD-params.FullImmutabilityThreshold). -func (f *chainFreezer) freezeThreshold(db ethdb.KeyValueReader) (uint64, error) { +func (f *chainFreezer) freezeThreshold(db ethdb.Reader) (uint64, error) { var ( head = f.readHeadNumber(db) final = f.readFinalizedNumber(db) diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 387040108..f82df9d96 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -35,16 +35,16 @@ import ( // injects into the database the block hash->number mappings. func InitDatabaseFromFreezer(db ethdb.Database) { // If we can't access the freezer or it's empty, abort - frozen, err := db.ItemAmountInAncient() + frozen, err := db.BlockStore().ItemAmountInAncient() if err != nil || frozen == 0 { return } var ( - batch = db.NewBatch() + batch = db.BlockStore().NewBatch() start = time.Now() logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log hash common.Hash - offset = db.AncientOffSet() + offset = db.BlockStore().AncientOffSet() ) for i := uint64(0) + offset; i < frozen+offset; i++ { // We read 100K hashes at a time, for a total of 3.2M @@ -52,7 +52,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) { if i+count > frozen+offset { count = frozen + offset - i } - data, err := db.AncientRange(ChainFreezerHashTable, i, count, 32*count) + data, err := db.BlockStore().AncientRange(ChainFreezerHashTable, i, count, 32*count) if err != nil { log.Crit("Failed to init database from freezer", "err", err) } @@ -100,7 +100,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool number uint64 rlp rlp.RawValue } - if offset := db.AncientOffSet(); offset > from { + if offset := db.BlockStore().AncientOffSet(); offset > from { from = offset } if to <= from { @@ -122,7 +122,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool } defer close(rlpCh) for n != end { - data := ReadCanonicalBodyRLP(db.BlockStore(), n) + data := ReadCanonicalBodyRLP(db, n) // Feed the block to the aggregator, or abort on interrupt select { case rlpCh <- &numberRlp{n, data}: @@ -187,7 +187,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool // signal received. func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) { // short circuit for invalid range - if offset := db.AncientOffSet(); offset > from { + if offset := db.BlockStore().AncientOffSet(); offset > from { from = offset } if from >= to { @@ -286,7 +286,7 @@ func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, inte // signal received. func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) { // short circuit for invalid range - if offset := db.AncientOffSet(); offset > from { + if offset := db.BlockStore().AncientOffSet(); offset > from { from = offset } if from >= to { diff --git a/core/rawdb/database.go b/core/rawdb/database.go index ad4e3f81c..5049ade33 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -61,8 +61,10 @@ func (frdb *freezerdb) BlockStoreReader() ethdb.Reader { } func (frdb *freezerdb) BlockStoreWriter() ethdb.Writer { - // TODO implement me - panic("implement me") + if frdb.blockStore == nil { + return frdb + } + return frdb.blockStore } // AncientDatadir returns the path of root ancient directory. @@ -116,6 +118,13 @@ func (frdb *freezerdb) StateStore() ethdb.Database { return frdb.stateStore } +func (frdb *freezerdb) GetStateStore() ethdb.Database { + if frdb.stateStore != nil { + return frdb.stateStore + } + return frdb +} + func (frdb *freezerdb) SetStateStore(state ethdb.Database) { if frdb.stateStore != nil { frdb.stateStore.Close() @@ -254,6 +263,13 @@ func (db *nofreezedb) SetStateStore(state ethdb.Database) { db.stateStore = state } +func (db *nofreezedb) GetStateStore() ethdb.Database { + if db.stateStore != nil { + return db.stateStore + } + return db +} + func (db *nofreezedb) StateStoreReader() ethdb.Reader { if db.stateStore != nil { return db.stateStore @@ -403,6 +419,7 @@ func (db *emptyfreezedb) Sync() error { func (db *emptyfreezedb) DiffStore() ethdb.KeyValueStore { return db } func (db *emptyfreezedb) SetDiffStore(diff ethdb.KeyValueStore) {} func (db *emptyfreezedb) StateStore() ethdb.Database { return db } +func (db *emptyfreezedb) GetStateStore() ethdb.Database { return db } func (db *emptyfreezedb) SetStateStore(state ethdb.Database) {} func (db *emptyfreezedb) StateStoreReader() ethdb.Reader { return db } func (db *emptyfreezedb) BlockStore() ethdb.Database { return db } @@ -518,8 +535,17 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st // Create the idle freezer instance frdb, err := newChainFreezer(resolveChainFreezerDir(ancient), namespace, readonly, offset, multiDatabase) + + // We are creating the freezerdb here because the validation logic for db and freezer below requires certain interfaces + // that need a database type. Therefore, we are pre-creating it for subsequent use. + freezerDb := &freezerdb{ + ancientRoot: ancient, + KeyValueStore: db, + AncientStore: frdb, + AncientFreezer: frdb, + } if err != nil { - printChainMetadata(db) + printChainMetadata(freezerDb) return nil, err } @@ -555,10 +581,10 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st // the freezer and the key-value store. frgenesis, err := frdb.Ancient(ChainFreezerHashTable, 0) if err != nil { - printChainMetadata(db) + printChainMetadata(freezerDb) return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err) } else if !bytes.Equal(kvgenesis, frgenesis) { - printChainMetadata(db) + printChainMetadata(freezerDb) return nil, fmt.Errorf("genesis mismatch: %#x (leveldb) != %#x (ancients)", kvgenesis, frgenesis) } // Key-value store and freezer belong to the same network. Ensure that they @@ -566,7 +592,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st if kvhash, _ := db.Get(headerHashKey(frozen)); len(kvhash) == 0 { // Subsequent header after the freezer limit is missing from the database. // Reject startup if the database has a more recent head. - if head := *ReadHeaderNumber(db, ReadHeadHeaderHash(db)); head > frozen-1 { + if head := *ReadHeaderNumber(freezerDb, ReadHeadHeaderHash(freezerDb)); head > frozen-1 { // Find the smallest block stored in the key-value store // in range of [frozen, head] var number uint64 @@ -576,7 +602,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st } } // We are about to exit on error. Print database metadata before exiting - printChainMetadata(db) + printChainMetadata(freezerDb) return nil, fmt.Errorf("gap in the chain between ancients [0 - #%d] and leveldb [#%d - #%d] ", frozen-1, number, head) } @@ -591,11 +617,11 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st // store, otherwise we'll end up missing data. We check block #1 to decide // if we froze anything previously or not, but do take care of databases with // only the genesis block. - if ReadHeadHeaderHash(db) != common.BytesToHash(kvgenesis) { + if ReadHeadHeaderHash(freezerDb) != common.BytesToHash(kvgenesis) { // Key-value store contains more data than the genesis block, make sure we // didn't freeze anything yet. if kvblob, _ := db.Get(headerHashKey(1)); len(kvblob) == 0 { - printChainMetadata(db) + printChainMetadata(freezerDb) return nil, errors.New("ancient chain segments already extracted, please set --datadir.ancient to the correct path") } // Block #1 is still in the database, we're allowed to init a new freezer @@ -617,12 +643,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st frdb.wg.Done() }() } - return &freezerdb{ - ancientRoot: ancient, - KeyValueStore: db, - AncientStore: frdb, - AncientFreezer: frdb, - }, nil + return freezerDb, nil } // NewMemoryDatabase creates an ephemeral in-memory key-value database without a @@ -1238,7 +1259,7 @@ func DeleteTrieState(db ethdb.Database) error { } // printChainMetadata prints out chain metadata to stderr. -func printChainMetadata(db ethdb.KeyValueStore) { +func printChainMetadata(db ethdb.Reader) { fmt.Fprintf(os.Stderr, "Chain metadata\n") for _, v := range ReadChainMetadata(db) { fmt.Fprintf(os.Stderr, " %s\n", strings.Join(v, ": ")) @@ -1249,7 +1270,7 @@ func printChainMetadata(db ethdb.KeyValueStore) { // ReadChainMetadata returns a set of key/value pairs that contains information // about the database chain status. This can be used for diagnostic purposes // when investigating the state of the node. -func ReadChainMetadata(db ethdb.KeyValueStore) [][]string { +func ReadChainMetadata(db ethdb.Reader) [][]string { pp := func(val *uint64) string { if val == nil { return "" @@ -1271,26 +1292,3 @@ func ReadChainMetadata(db ethdb.KeyValueStore) [][]string { } return data } - -func ReadChainMetadataFromMultiDatabase(db ethdb.Database) [][]string { - pp := func(val *uint64) string { - if val == nil { - return "" - } - return fmt.Sprintf("%d (%#x)", *val, *val) - } - data := [][]string{ - {"databaseVersion", pp(ReadDatabaseVersion(db))}, - {"headBlockHash", fmt.Sprintf("%v", ReadHeadBlockHash(db.BlockStore()))}, - {"headFastBlockHash", fmt.Sprintf("%v", ReadHeadFastBlockHash(db.BlockStore()))}, - {"headHeaderHash", fmt.Sprintf("%v", ReadHeadHeaderHash(db.BlockStore()))}, - {"lastPivotNumber", pp(ReadLastPivotNumber(db))}, - {"len(snapshotSyncStatus)", fmt.Sprintf("%d bytes", len(ReadSnapshotSyncStatus(db)))}, - {"snapshotDisabled", fmt.Sprintf("%v", ReadSnapshotDisabled(db))}, - {"snapshotJournal", fmt.Sprintf("%d bytes", len(ReadSnapshotJournal(db)))}, - {"snapshotRecoveryNumber", pp(ReadSnapshotRecoveryNumber(db))}, - {"snapshotRoot", fmt.Sprintf("%v", ReadSnapshotRoot(db))}, - {"txIndexTail", pp(ReadTxIndexTail(db))}, - } - return data -} diff --git a/core/rawdb/table.go b/core/rawdb/table.go index c4a029bca..7fc182b7c 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -251,6 +251,10 @@ func (t *table) SetStateStore(state ethdb.Database) { panic("not implement") } +func (t *table) GetStateStore() ethdb.Database { + return nil +} + func (t *table) StateStoreReader() ethdb.Reader { return nil } diff --git a/core/state/sync.go b/core/state/sync.go index 411b54eab..2b07f1b06 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -25,7 +25,7 @@ import ( ) // NewStateSync creates a new state trie download scheduler. -func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(keys [][]byte, leaf []byte) error, scheme string) *trie.Sync { +func NewStateSync(root common.Hash, database ethdb.Database, onLeaf func(keys [][]byte, leaf []byte) error, scheme string) *trie.Sync { // Register the storage slot callback if the external callback is specified. var onSlot func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error if onLeaf != nil { diff --git a/core/state/sync_test.go b/core/state/sync_test.go index adb3ff949..211b0be9f 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -268,7 +268,7 @@ func testIterativeStateSync(t *testing.T, count int, commit bool, bypath bool, s } } batch := dstDb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -369,7 +369,7 @@ func testIterativeDelayedStateSync(t *testing.T, scheme string) { nodeProcessed = len(nodeResults) } batch := dstDb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -469,7 +469,7 @@ func testIterativeRandomStateSync(t *testing.T, count int, scheme string) { } } batch := dstDb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -575,7 +575,7 @@ func testIterativeRandomDelayedStateSync(t *testing.T, scheme string) { } } batch := dstDb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -688,7 +688,7 @@ func testIncompleteStateSync(t *testing.T, scheme string) { } } batch := dstDb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 664f77505..09925d7d6 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -558,8 +558,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } else { d.ancientLimit = 0 } - frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. - itemAmountInAncient, _ := d.stateDB.ItemAmountInAncient() + frozen, _ := d.stateDB.BlockStore().Ancients() // Ignore the error here since light client can also hit here. + itemAmountInAncient, _ := d.stateDB.BlockStore().ItemAmountInAncient() // If a part of blockchain data has already been written into active store, // disable the ancient style insertion explicitly. if origin >= frozen && itemAmountInAncient != 0 { @@ -1671,9 +1671,9 @@ func (d *Downloader) reportSnapSyncProgress(force bool) { } // Don't report anything until we have a meaningful progress var ( - headerBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerHeaderTable) - bodyBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerBodiesTable) - receiptBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerReceiptTable) + headerBytes, _ = d.stateDB.BlockStore().AncientSize(rawdb.ChainFreezerHeaderTable) + bodyBytes, _ = d.stateDB.BlockStore().AncientSize(rawdb.ChainFreezerBodiesTable) + receiptBytes, _ = d.stateDB.BlockStore().AncientSize(rawdb.ChainFreezerReceiptTable) ) syncedBytes := common.StorageSize(headerBytes + bodyBytes + receiptBytes) if syncedBytes == 0 { diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 988a632e5..56d48649d 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -409,8 +409,8 @@ type SyncPeer interface { // - The peer delivers a stale response after a previous timeout // - The peer delivers a refusal to serve the requested state type Syncer struct { - db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup) - scheme string // Node scheme used in node database + db ethdb.Database // Database to store the trie nodes into (and dedup) + scheme string // Node scheme used in node database root common.Hash // Current state trie root being synced tasks []*accountTask // Current account task set being synced @@ -478,7 +478,7 @@ type Syncer struct { // NewSyncer creates a new snapshot syncer to download the Ethereum state over the // snap protocol. -func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer { +func NewSyncer(db ethdb.Database, scheme string) *Syncer { return &Syncer{ db: db, scheme: scheme, @@ -719,11 +719,11 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { // cleanPath is used to remove the dangling nodes in the stackTrie. func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) { - if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db, path) { + if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db.StateStoreReader(), path) { rawdb.DeleteAccountTrieNode(batch, path) deletionGauge.Inc(1) } - if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db, owner, path) { + if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db.StateStoreReader(), owner, path) { rawdb.DeleteStorageTrieNode(batch, owner, path) deletionGauge.Inc(1) } @@ -735,6 +735,7 @@ func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) { func (s *Syncer) loadSyncStatus() { var progress SyncProgress + stateDiskDB := s.db.GetStateStore() if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil { if err := json.Unmarshal(status, &progress); err != nil { log.Error("Failed to decode snap sync status", "err", err) @@ -747,7 +748,7 @@ func (s *Syncer) loadSyncStatus() { task := task // closure for task.genBatch in the stacktrie writer callback task.genBatch = ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: stateDiskDB.NewBatch(), OnPut: func(key []byte, value []byte) { s.accountBytes += common.StorageSize(len(key) + len(value)) }, @@ -773,7 +774,7 @@ func (s *Syncer) loadSyncStatus() { subtask := subtask // closure for subtask.genBatch in the stacktrie writer callback subtask.genBatch = ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: stateDiskDB.NewBatch(), OnPut: func(key []byte, value []byte) { s.storageBytes += common.StorageSize(len(key) + len(value)) }, @@ -841,7 +842,7 @@ func (s *Syncer) loadSyncStatus() { last = common.MaxHash } batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: stateDiskDB.NewBatch(), OnPut: func(key []byte, value []byte) { s.accountBytes += common.StorageSize(len(key) + len(value)) }, @@ -1894,7 +1895,7 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { } // Check if the account is a contract with an unknown storage trie if account.Root != types.EmptyRootHash { - if !rawdb.HasTrieNode(s.db, res.hashes[i], nil, account.Root, s.scheme) { + if !rawdb.HasTrieNode(s.db.StateStoreReader(), res.hashes[i], nil, account.Root, s.scheme) { // If there was a previous large state retrieval in progress, // don't restart it from scratch. This happens if a sync cycle // is interrupted and resumed later. However, *do* update the @@ -1986,12 +1987,25 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { if res.subTask != nil { res.subTask.req = nil } + + var usingMultDatabase bool batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: s.db.GetStateStore().NewBatch(), OnPut: func(key []byte, value []byte) { s.storageBytes += common.StorageSize(len(key) + len(value)) }, } + var snapBatch ethdb.HookedBatch + if s.db.StateStore() != nil { + usingMultDatabase = true + snapBatch = ethdb.HookedBatch{ + Batch: s.db.NewBatch(), + OnPut: func(key []byte, value []byte) { + s.storageBytes += common.StorageSize(len(key) + len(value)) + }, + } + } + var ( slots int oldStorageBytes = s.storageBytes @@ -2061,7 +2075,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { } // Our first task is the one that was just filled by this response. batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: s.db.GetStateStore().NewBatch(), OnPut: func(key []byte, value []byte) { s.storageBytes += common.StorageSize(len(key) + len(value)) }, @@ -2088,7 +2102,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { }) for r.Next() { batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), + Batch: s.db.GetStateStore().NewBatch(), OnPut: func(key []byte, value []byte) { s.storageBytes += common.StorageSize(len(key) + len(value)) }, @@ -2184,8 +2198,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // outdated during the sync, but it can be fixed later during the // snapshot generation. for j := 0; j < len(res.hashes[i]); j++ { - rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) - + if usingMultDatabase { + rawdb.WriteStorageSnapshot(snapBatch, account, res.hashes[i][j], res.slots[i][j]) + } else { + rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) + } // If we're storing large contracts, generate the trie nodes // on the fly to not trash the gluing points if i == len(res.hashes)-1 && res.subTask != nil { @@ -2205,7 +2222,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // If the chunk's root is an overflown but full delivery, // clear the heal request. accountHash := res.accounts[len(res.accounts)-1] - if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db, accountHash, nil, root) { + if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db.StateStoreReader(), accountHash, nil, root) { for i, account := range res.mainTask.res.hashes { if account == accountHash { res.mainTask.needHeal[i] = false @@ -2225,6 +2242,11 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { if err := batch.Write(); err != nil { log.Crit("Failed to persist storage slots", "err", err) } + if usingMultDatabase { + if err := snapBatch.Write(); err != nil { + log.Crit("Failed to persist storage slots", "err", err) + } + } s.storageSynced += uint64(slots) log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes) @@ -2323,12 +2345,25 @@ func (s *Syncer) commitHealer(force bool) { return } batch := s.db.NewBatch() - if err := s.healer.scheduler.Commit(batch); err != nil { + var stateBatch ethdb.Batch + var err error + if s.db.StateStore() != nil { + stateBatch = s.db.StateStore().NewBatch() + err = s.healer.scheduler.Commit(batch, stateBatch) + } else { + err = s.healer.scheduler.Commit(batch, nil) + } + if err != nil { log.Error("Failed to commit healing data", "err", err) } if err := batch.Write(); err != nil { log.Crit("Failed to persist healing data", "err", err) } + if s.db.StateStore() != nil { + if err := stateBatch.Write(); err != nil { + log.Crit("Failed to persist healing data", "err", err) + } + } log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) } diff --git a/ethdb/database.go b/ethdb/database.go index e4bfbc95b..5d7d54353 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -180,12 +180,6 @@ type StateStoreReader interface { StateStoreReader() Reader } -type BlockStore interface { - BlockStore() Database - SetBlockStore(block Database) - HasSeparateBlockStore() bool -} - type BlockStoreReader interface { BlockStoreReader() Reader } @@ -194,6 +188,14 @@ type BlockStoreWriter interface { BlockStoreWriter() Writer } +// MultiDatabaseReader contains the methods required to read data from both key-value as well as +// blockStore or stateStore. +type MultiDatabaseReader interface { + KeyValueReader + StateStoreReader + BlockStoreReader +} + // Reader contains the methods required to read data from both key-value as well as // immutable ancient data. type Reader interface { @@ -234,6 +236,13 @@ type DiffStore interface { type StateStore interface { StateStore() Database SetStateStore(state Database) + GetStateStore() Database +} + +type BlockStore interface { + BlockStore() Database + SetBlockStore(block Database) + HasSeparateBlockStore() bool } // Database contains all the methods required by the high level database to not diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index 2a939f9a1..fa2456e3e 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -39,6 +39,9 @@ var ( // errSnapshotReleased is returned if callers want to retrieve data from a // released snapshot. errSnapshotReleased = errors.New("snapshot released") + + // errNotSupported is returned if the database doesn't support the required operation. + errNotSupported = errors.New("this operation is not supported") ) // Database is an ephemeral key-value store. Apart from basic data storage @@ -47,6 +50,84 @@ var ( type Database struct { db map[string][]byte lock sync.RWMutex + + stateStore ethdb.Database + blockStore ethdb.Database +} + +func (db *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) TruncateHead(n uint64) (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) TruncateTail(n uint64) (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) Sync() error { + //TODO implement me + panic("implement me") +} + +func (db *Database) TruncateTableTail(kind string, tail uint64) (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) ResetTable(kind string, startAt uint64, onlyEmpty bool) error { + //TODO implement me + panic("implement me") +} + +func (db *Database) HasAncient(kind string, number uint64) (bool, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) Ancient(kind string, number uint64) ([]byte, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) Ancients() (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) Tail() (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) AncientSize(kind string) (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) ItemAmountInAncient() (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) AncientOffSet() uint64 { + //TODO implement me + panic("implement me") +} + +func (db *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) { + //TODO implement me + panic("implement me") } // New returns a wrapped map with all the required database interface methods @@ -204,6 +285,37 @@ func (db *Database) Len() int { return len(db.db) } +func (db *Database) StateStoreReader() ethdb.Reader { + if db.stateStore == nil { + return db + } + return db.stateStore +} + +func (db *Database) BlockStoreReader() ethdb.Reader { + if db.blockStore == nil { + return db + } + return db.blockStore +} + +func (db *Database) BlockStoreWriter() ethdb.Writer { + if db.blockStore == nil { + return db + } + return db.blockStore +} + +// convertLegacyFn takes a raw freezer entry in an older format and +// returns it in the new format. +type convertLegacyFn = func([]byte) ([]byte, error) + +// MigrateTable processes the entries in a given table in sequence +// converting them to a new format if they're of an old format. +func (db *Database) MigrateTable(kind string, convert convertLegacyFn) error { + return errNotSupported +} + // keyvalue is a key-value tuple tagged with a deletion field to allow creating // memory-database write batches. type keyvalue struct { diff --git a/ethdb/remotedb/remotedb.go b/ethdb/remotedb/remotedb.go index 2ba5807a3..f755dd16b 100644 --- a/ethdb/remotedb/remotedb.go +++ b/ethdb/remotedb/remotedb.go @@ -122,6 +122,10 @@ func (db *Database) SetStateStore(state ethdb.Database) { panic("not supported") } +func (db *Database) GetStateStore() ethdb.Database { + panic("not supported") +} + func (db *Database) StateStoreReader() ethdb.Reader { return db } diff --git a/internal/ethapi/dbapi.go b/internal/ethapi/dbapi.go index 33fda936d..b891091b9 100644 --- a/internal/ethapi/dbapi.go +++ b/internal/ethapi/dbapi.go @@ -33,11 +33,11 @@ func (api *DebugAPI) DbGet(key string) (hexutil.Bytes, error) { // DbAncient retrieves an ancient binary blob from the append-only immutable files. // It is a mapping to the `AncientReaderOp.Ancient` method func (api *DebugAPI) DbAncient(kind string, number uint64) (hexutil.Bytes, error) { - return api.b.ChainDb().Ancient(kind, number) + return api.b.ChainDb().BlockStore().Ancient(kind, number) } // DbAncients returns the ancient item numbers in the ancient store. // It is a mapping to the `AncientReaderOp.Ancients` method func (api *DebugAPI) DbAncients() (uint64, error) { - return api.b.ChainDb().Ancients() + return api.b.ChainDb().BlockStore().Ancients() } diff --git a/node/node.go b/node/node.go index 7d07e576d..069269de0 100644 --- a/node/node.go +++ b/node/node.go @@ -74,11 +74,11 @@ const ( initializingState = iota runningState closedState - blockDbCacheSize = 256 - blockDbHandlesMinSize = 1000 - blockDbHandlesMaxSize = 2000 - chainDbMemoryPercentage = 50 - chainDbHandlesPercentage + blockDbCacheSize = 256 + blockDbHandlesMinSize = 1000 + blockDbHandlesMaxSize = 2000 + chainDbMemoryPercentage = 50 + chainDbHandlesPercentage = 50 diffStoreHandlesPercentage = 20 ) @@ -791,14 +791,15 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r func (n *Node) OpenAndMergeDatabase(name string, namespace string, readonly bool, config *ethconfig.Config) (ethdb.Database, error) { var ( - err error - stateDiskDb ethdb.Database - blockDb ethdb.Database - disableChainDbFreeze = false - blockDbHandlesSize int - diffStoreHandles int - chainDataHandles = config.DatabaseHandles - chainDbCache = config.DatabaseCache + err error + stateDiskDb ethdb.Database + blockDb ethdb.Database + disableChainDbFreeze = false + blockDbHandlesSize int + diffStoreHandles int + chainDataHandles = config.DatabaseHandles + chainDbCache = config.DatabaseCache + stateDbCache, stateDbHandles int ) if config.PersistDiff { @@ -818,10 +819,17 @@ func (n *Node) OpenAndMergeDatabase(name string, namespace string, readonly bool } else { blockDbHandlesSize = blockDbHandlesMinSize } - stateDbCache := config.DatabaseCache - chainDbCache - blockDbCacheSize - stateDbHandles := config.DatabaseHandles - chainDataHandles - blockDbHandlesSize + stateDbCache = config.DatabaseCache - chainDbCache - blockDbCacheSize + stateDbHandles = config.DatabaseHandles - chainDataHandles - blockDbHandlesSize disableChainDbFreeze = true + } + chainDB, err := n.OpenDatabaseWithFreezer(name, chainDbCache, chainDataHandles, config.DatabaseFreezer, namespace, readonly, disableChainDbFreeze, false, config.PruneAncientData) + if err != nil { + return nil, err + } + + if isMultiDatabase { // Allocate half of the handles and chainDbCache to this separate state data database stateDiskDb, err = n.OpenDatabaseWithFreezer(name+"/state", stateDbCache, stateDbHandles, "", "eth/db/statedata/", readonly, true, false, config.PruneAncientData) if err != nil { @@ -833,14 +841,6 @@ func (n *Node) OpenAndMergeDatabase(name string, namespace string, readonly bool return nil, err } log.Warn("Multi-database is an experimental feature") - } - - chainDB, err := n.OpenDatabaseWithFreezer(name, chainDbCache, chainDataHandles, config.DatabaseFreezer, namespace, readonly, disableChainDbFreeze, false, config.PruneAncientData) - if err != nil { - return nil, err - } - - if isMultiDatabase { chainDB.SetStateStore(stateDiskDb) chainDB.SetBlockStore(blockDb) } diff --git a/trie/sync.go b/trie/sync.go index 589d28364..5c74dfcc0 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -229,7 +229,7 @@ func (batch *syncMemBatch) delNode(owner common.Hash, path []byte) { // and reconstructs the trie step by step until all is done. type Sync struct { scheme string // Node scheme descriptor used in database. - database ethdb.KeyValueReader // Persistent database to check for existing entries + database ethdb.Database // Persistent database to check for existing entries membatch *syncMemBatch // Memory buffer to avoid frequent database writes nodeReqs map[string]*nodeRequest // Pending requests pertaining to a trie node path codeReqs map[common.Hash]*codeRequest // Pending requests pertaining to a code hash @@ -238,7 +238,7 @@ type Sync struct { } // NewSync creates a new trie data download scheduler. -func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallback, scheme string) *Sync { +func NewSync(root common.Hash, database ethdb.Database, callback LeafCallback, scheme string) *Sync { ts := &Sync{ scheme: scheme, database: database, @@ -420,7 +420,7 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error { // Commit flushes the data stored in the internal membatch out to persistent // storage, returning any occurred error. The whole data set will be flushed // in an atomic database batch. -func (s *Sync) Commit(dbw ethdb.Batch) error { +func (s *Sync) Commit(dbw ethdb.Batch, stateBatch ethdb.Batch) error { // Flush the pending node writes into database batch. var ( account int @@ -430,9 +430,17 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { if op.isDelete() { // node deletion is only supported in path mode. if op.owner == (common.Hash{}) { - rawdb.DeleteAccountTrieNode(dbw, op.path) + if stateBatch != nil { + rawdb.DeleteAccountTrieNode(stateBatch, op.path) + } else { + rawdb.DeleteAccountTrieNode(dbw, op.path) + } } else { - rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path) + if stateBatch != nil { + rawdb.DeleteStorageTrieNode(stateBatch, op.owner, op.path) + } else { + rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path) + } } deletionGauge.Inc(1) } else { @@ -441,7 +449,11 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { } else { storage += 1 } - rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme) + if stateBatch != nil { + rawdb.WriteTrieNode(stateBatch, op.owner, op.path, op.hash, op.blob, s.scheme) + } else { + rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme) + } } } accountNodeSyncedGauge.Inc(int64(account)) @@ -546,9 +558,9 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { // the performance impact negligible. var exists bool if owner == (common.Hash{}) { - exists = rawdb.ExistsAccountTrieNode(s.database, append(inner, key[:i]...)) + exists = rawdb.ExistsAccountTrieNode(s.database.StateStoreReader(), append(inner, key[:i]...)) } else { - exists = rawdb.ExistsStorageTrieNode(s.database, owner, append(inner, key[:i]...)) + exists = rawdb.ExistsStorageTrieNode(s.database.StateStoreReader(), owner, append(inner, key[:i]...)) } if exists { s.membatch.delNode(owner, append(inner, key[:i]...)) @@ -687,15 +699,15 @@ func (s *Sync) commitCodeRequest(req *codeRequest) error { func (s *Sync) hasNode(owner common.Hash, path []byte, hash common.Hash) (exists bool, inconsistent bool) { // If node is running with hash scheme, check the presence with node hash. if s.scheme == rawdb.HashScheme { - return rawdb.HasLegacyTrieNode(s.database, hash), false + return rawdb.HasLegacyTrieNode(s.database.StateStoreReader(), hash), false } // If node is running with path scheme, check the presence with node path. var blob []byte var dbHash common.Hash if owner == (common.Hash{}) { - blob, dbHash = rawdb.ReadAccountTrieNode(s.database, path) + blob, dbHash = rawdb.ReadAccountTrieNode(s.database.StateStoreReader(), path) } else { - blob, dbHash = rawdb.ReadStorageTrieNode(s.database, owner, path) + blob, dbHash = rawdb.ReadStorageTrieNode(s.database.StateStoreReader(), owner, path) } exists = hash == dbHash inconsistent = !exists && len(blob) != 0 diff --git a/trie/sync_test.go b/trie/sync_test.go index 7bc68c041..df2c1f59f 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/trie/trienode" ) @@ -143,7 +142,7 @@ func TestEmptySync(t *testing.T) { emptyD, _ := New(TrieID(types.EmptyRootHash), dbD) for i, trie := range []*Trie{emptyA, emptyB, emptyC, emptyD} { - sync := NewSync(trie.Hash(), memorydb.New(), nil, []*testDb{dbA, dbB, dbC, dbD}[i].Scheme()) + sync := NewSync(trie.Hash(), rawdb.NewMemoryDatabase(), nil, []*testDb{dbA, dbB, dbC, dbD}[i].Scheme()) if paths, nodes, codes := sync.Missing(1); len(paths) != 0 || len(nodes) != 0 || len(codes) != 0 { t.Errorf("test %d: content requested for empty trie: %v, %v, %v", i, paths, nodes, codes) } @@ -212,7 +211,7 @@ func testIterativeSync(t *testing.T, count int, bypath bool, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -278,7 +277,7 @@ func testIterativeDelayedSync(t *testing.T, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -348,7 +347,7 @@ func testIterativeRandomSync(t *testing.T, count int, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -419,7 +418,7 @@ func testIterativeRandomDelayedSync(t *testing.T, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -491,7 +490,7 @@ func testDuplicateAvoidanceSync(t *testing.T, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -563,7 +562,7 @@ func testIncompleteSync(t *testing.T, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -653,7 +652,7 @@ func testSyncOrdering(t *testing.T, scheme string) { } } batch := diskdb.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } batch.Write() @@ -723,7 +722,7 @@ func syncWithHookWriter(t *testing.T, root common.Hash, db ethdb.Database, srcDb } } batch := db.NewBatch() - if err := sched.Commit(batch); err != nil { + if err := sched.Commit(batch, nil); err != nil { t.Fatalf("failed to commit data: %v", err) } if hookWriter != nil {