2ce00adb55
* focus on performance improvement in many aspects. 1. Do BlockBody verification concurrently; 2. Do calculation of intermediate root concurrently; 3. Preload accounts before processing blocks; 4. Make the snapshot layers configurable. 5. Reuse some object to reduce GC. add * rlp: improve decoder stream implementation (#22858) This commit makes various cleanup changes to rlp.Stream. * rlp: shrink Stream struct This removes a lot of unused padding space in Stream by reordering the fields. The size of Stream changes from 120 bytes to 88 bytes. Stream instances are internally cached and reused using sync.Pool, so this does not improve performance. * rlp: simplify list stack The list stack kept track of the size of the current list context as well as the current offset into it. The size had to be stored in the stack in order to subtract it from the remaining bytes of any enclosing list in ListEnd. It seems that this can be implemented in a simpler way: just subtract the size from the enclosing list context in List instead. * rlp: use atomic.Value for type cache (#22902) All encoding/decoding operations read the type cache to find the writer/decoder function responsible for a type. When analyzing CPU profiles of geth during sync, I found that the use of sync.RWMutex in cache lookups appears in the profiles. It seems we are running into CPU cache contention problems when package rlp is heavily used on all CPU cores during sync. This change makes it use atomic.Value + a writer lock instead of sync.RWMutex. In the common case where the typeinfo entry is present in the cache, we simply fetch the map and lookup the type. * rlp: optimize byte array handling (#22924) This change improves the performance of encoding/decoding [N]byte. name old time/op new time/op delta DecodeByteArrayStruct-8 336ns ± 0% 246ns ± 0% -26.98% (p=0.000 n=9+10) EncodeByteArrayStruct-8 225ns ± 1% 148ns ± 1% -34.12% (p=0.000 n=10+10) name old alloc/op new alloc/op delta DecodeByteArrayStruct-8 120B ± 0% 48B ± 0% -60.00% (p=0.000 n=10+10) EncodeByteArrayStruct-8 0.00B 0.00B ~ (all equal) * rlp: optimize big.Int decoding for size <= 32 bytes (#22927) This change grows the static integer buffer in Stream to 32 bytes, making it possible to decode 256bit integers without allocating a temporary buffer. In the recent commit 088da24, Stream struct size decreased from 120 bytes down to 88 bytes. This commit grows the struct to 112 bytes again, but the size change will not degrade performance because Stream instances are internally cached in sync.Pool. name old time/op new time/op delta DecodeBigInts-8 12.2µs ± 0% 8.6µs ± 4% -29.58% (p=0.000 n=9+10) name old speed new speed delta DecodeBigInts-8 230MB/s ± 0% 326MB/s ± 4% +42.04% (p=0.000 n=9+10) * eth/protocols/eth, les: avoid Raw() when decoding HashOrNumber (#22841) Getting the raw value is not necessary to decode this type, and decoding it directly from the stream is faster. * fix testcase * debug no lazy * fix can not repair * address comments Co-authored-by: Felix Lange <fjl@twurst.com>
895 lines
31 KiB
Go
895 lines
31 KiB
Go
// Copyright 2018 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package trie
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"reflect"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/fastcache"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
)
|
|
|
|
var (
|
|
memcacheCleanHitMeter = metrics.NewRegisteredMeter("trie/memcache/clean/hit", nil)
|
|
memcacheCleanMissMeter = metrics.NewRegisteredMeter("trie/memcache/clean/miss", nil)
|
|
memcacheCleanReadMeter = metrics.NewRegisteredMeter("trie/memcache/clean/read", nil)
|
|
memcacheCleanWriteMeter = metrics.NewRegisteredMeter("trie/memcache/clean/write", nil)
|
|
|
|
memcacheDirtyHitMeter = metrics.NewRegisteredMeter("trie/memcache/dirty/hit", nil)
|
|
memcacheDirtyMissMeter = metrics.NewRegisteredMeter("trie/memcache/dirty/miss", nil)
|
|
memcacheDirtyReadMeter = metrics.NewRegisteredMeter("trie/memcache/dirty/read", nil)
|
|
memcacheDirtyWriteMeter = metrics.NewRegisteredMeter("trie/memcache/dirty/write", nil)
|
|
|
|
memcacheFlushTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/flush/time", nil)
|
|
memcacheFlushNodesMeter = metrics.NewRegisteredMeter("trie/memcache/flush/nodes", nil)
|
|
memcacheFlushSizeMeter = metrics.NewRegisteredMeter("trie/memcache/flush/size", nil)
|
|
|
|
memcacheGCTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/gc/time", nil)
|
|
memcacheGCNodesMeter = metrics.NewRegisteredMeter("trie/memcache/gc/nodes", nil)
|
|
memcacheGCSizeMeter = metrics.NewRegisteredMeter("trie/memcache/gc/size", nil)
|
|
|
|
memcacheCommitTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/commit/time", nil)
|
|
memcacheCommitNodesMeter = metrics.NewRegisteredMeter("trie/memcache/commit/nodes", nil)
|
|
memcacheCommitSizeMeter = metrics.NewRegisteredMeter("trie/memcache/commit/size", nil)
|
|
)
|
|
|
|
// Database is an intermediate write layer between the trie data structures and
|
|
// the disk database. The aim is to accumulate trie writes in-memory and only
|
|
// periodically flush a couple tries to disk, garbage collecting the remainder.
|
|
//
|
|
// Note, the trie Database is **not** thread safe in its mutations, but it **is**
|
|
// thread safe in providing individual, independent node access. The rationale
|
|
// behind this split design is to provide read access to RPC handlers and sync
|
|
// servers even while the trie is executing expensive garbage collection.
|
|
type Database struct {
|
|
diskdb ethdb.KeyValueStore // Persistent storage for matured trie nodes
|
|
|
|
cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs
|
|
dirties map[common.Hash]*cachedNode // Data and references relationships of dirty trie nodes
|
|
oldest common.Hash // Oldest tracked node, flush-list head
|
|
newest common.Hash // Newest tracked node, flush-list tail
|
|
|
|
preimages map[common.Hash][]byte // Preimages of nodes from the secure trie
|
|
|
|
gctime time.Duration // Time spent on garbage collection since last commit
|
|
gcnodes uint64 // Nodes garbage collected since last commit
|
|
gcsize common.StorageSize // Data storage garbage collected since last commit
|
|
|
|
flushtime time.Duration // Time spent on data flushing since last commit
|
|
flushnodes uint64 // Nodes flushed since last commit
|
|
flushsize common.StorageSize // Data storage flushed since last commit
|
|
|
|
dirtiesSize common.StorageSize // Storage size of the dirty node cache (exc. metadata)
|
|
childrenSize common.StorageSize // Storage size of the external children tracking
|
|
preimagesSize common.StorageSize // Storage size of the preimages cache
|
|
|
|
//metrics with light lock
|
|
sizeLock sync.RWMutex
|
|
roughPreimagesSize common.StorageSize
|
|
roughDirtiesSize common.StorageSize
|
|
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
// rawNode is a simple binary blob used to differentiate between collapsed trie
|
|
// nodes and already encoded RLP binary blobs (while at the same time store them
|
|
// in the same cache fields).
|
|
type rawNode []byte
|
|
|
|
func (n rawNode) cache() (hashNode, bool) { panic("this should never end up in a live trie") }
|
|
func (n rawNode) fstring(ind string) string { panic("this should never end up in a live trie") }
|
|
|
|
func (n rawNode) EncodeRLP(w io.Writer) error {
|
|
_, err := w.Write(n)
|
|
return err
|
|
}
|
|
|
|
// rawFullNode represents only the useful data content of a full node, with the
|
|
// caches and flags stripped out to minimize its data storage. This type honors
|
|
// the same RLP encoding as the original parent.
|
|
type rawFullNode [17]node
|
|
|
|
func (n rawFullNode) cache() (hashNode, bool) { panic("this should never end up in a live trie") }
|
|
func (n rawFullNode) fstring(ind string) string { panic("this should never end up in a live trie") }
|
|
|
|
func (n rawFullNode) EncodeRLP(w io.Writer) error {
|
|
var nodes [17]node
|
|
|
|
for i, child := range n {
|
|
if child != nil {
|
|
nodes[i] = child
|
|
} else {
|
|
nodes[i] = nilValueNode
|
|
}
|
|
}
|
|
return rlp.Encode(w, nodes)
|
|
}
|
|
|
|
// rawShortNode represents only the useful data content of a short node, with the
|
|
// caches and flags stripped out to minimize its data storage. This type honors
|
|
// the same RLP encoding as the original parent.
|
|
type rawShortNode struct {
|
|
Key []byte
|
|
Val node
|
|
}
|
|
|
|
func (n rawShortNode) cache() (hashNode, bool) { panic("this should never end up in a live trie") }
|
|
func (n rawShortNode) fstring(ind string) string { panic("this should never end up in a live trie") }
|
|
|
|
// cachedNode is all the information we know about a single cached trie node
|
|
// in the memory database write layer.
|
|
type cachedNode struct {
|
|
node node // Cached collapsed trie node, or raw rlp data
|
|
size uint16 // Byte size of the useful cached data
|
|
|
|
parents uint32 // Number of live nodes referencing this one
|
|
children map[common.Hash]uint16 // External children referenced by this node
|
|
|
|
flushPrev common.Hash // Previous node in the flush-list
|
|
flushNext common.Hash // Next node in the flush-list
|
|
}
|
|
|
|
// cachedNodeSize is the raw size of a cachedNode data structure without any
|
|
// node data included. It's an approximate size, but should be a lot better
|
|
// than not counting them.
|
|
var cachedNodeSize = int(reflect.TypeOf(cachedNode{}).Size())
|
|
|
|
// cachedNodeChildrenSize is the raw size of an initialized but empty external
|
|
// reference map.
|
|
const cachedNodeChildrenSize = 48
|
|
|
|
// rlp returns the raw rlp encoded blob of the cached trie node, either directly
|
|
// from the cache, or by regenerating it from the collapsed node.
|
|
func (n *cachedNode) rlp() []byte {
|
|
if node, ok := n.node.(rawNode); ok {
|
|
return node
|
|
}
|
|
blob, err := rlp.EncodeToBytes(n.node)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return blob
|
|
}
|
|
|
|
// obj returns the decoded and expanded trie node, either directly from the cache,
|
|
// or by regenerating it from the rlp encoded blob.
|
|
func (n *cachedNode) obj(hash common.Hash) node {
|
|
if node, ok := n.node.(rawNode); ok {
|
|
return mustDecodeNode(hash[:], node)
|
|
}
|
|
return expandNode(hash[:], n.node)
|
|
}
|
|
|
|
// forChilds invokes the callback for all the tracked children of this node,
|
|
// both the implicit ones from inside the node as well as the explicit ones
|
|
// from outside the node.
|
|
func (n *cachedNode) forChilds(onChild func(hash common.Hash)) {
|
|
for child := range n.children {
|
|
onChild(child)
|
|
}
|
|
if _, ok := n.node.(rawNode); !ok {
|
|
forGatherChildren(n.node, onChild)
|
|
}
|
|
}
|
|
|
|
// forGatherChildren traverses the node hierarchy of a collapsed storage node and
|
|
// invokes the callback for all the hashnode children.
|
|
func forGatherChildren(n node, onChild func(hash common.Hash)) {
|
|
switch n := n.(type) {
|
|
case *rawShortNode:
|
|
forGatherChildren(n.Val, onChild)
|
|
case rawFullNode:
|
|
for i := 0; i < 16; i++ {
|
|
forGatherChildren(n[i], onChild)
|
|
}
|
|
case hashNode:
|
|
onChild(common.BytesToHash(n))
|
|
case valueNode, nil, rawNode:
|
|
default:
|
|
panic(fmt.Sprintf("unknown node type: %T", n))
|
|
}
|
|
}
|
|
|
|
// simplifyNode traverses the hierarchy of an expanded memory node and discards
|
|
// all the internal caches, returning a node that only contains the raw data.
|
|
func simplifyNode(n node) node {
|
|
switch n := n.(type) {
|
|
case *shortNode:
|
|
// Short nodes discard the flags and cascade
|
|
return &rawShortNode{Key: n.Key, Val: simplifyNode(n.Val)}
|
|
|
|
case *fullNode:
|
|
// Full nodes discard the flags and cascade
|
|
node := rawFullNode(n.Children)
|
|
for i := 0; i < len(node); i++ {
|
|
if node[i] != nil {
|
|
node[i] = simplifyNode(node[i])
|
|
}
|
|
}
|
|
return node
|
|
|
|
case valueNode, hashNode, rawNode:
|
|
return n
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unknown node type: %T", n))
|
|
}
|
|
}
|
|
|
|
// expandNode traverses the node hierarchy of a collapsed storage node and converts
|
|
// all fields and keys into expanded memory form.
|
|
func expandNode(hash hashNode, n node) node {
|
|
switch n := n.(type) {
|
|
case *rawShortNode:
|
|
// Short nodes need key and child expansion
|
|
return &shortNode{
|
|
Key: compactToHex(n.Key),
|
|
Val: expandNode(nil, n.Val),
|
|
flags: nodeFlag{
|
|
hash: hash,
|
|
},
|
|
}
|
|
|
|
case rawFullNode:
|
|
// Full nodes need child expansion
|
|
node := &fullNode{
|
|
flags: nodeFlag{
|
|
hash: hash,
|
|
},
|
|
}
|
|
for i := 0; i < len(node.Children); i++ {
|
|
if n[i] != nil {
|
|
node.Children[i] = expandNode(nil, n[i])
|
|
}
|
|
}
|
|
return node
|
|
|
|
case valueNode, hashNode:
|
|
return n
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unknown node type: %T", n))
|
|
}
|
|
}
|
|
|
|
// Config defines all necessary options for database.
|
|
type Config struct {
|
|
Cache int // Memory allowance (MB) to use for caching trie nodes in memory
|
|
Journal string // Journal of clean cache to survive node restarts
|
|
Preimages bool // Flag whether the preimage of trie key is recorded
|
|
}
|
|
|
|
// NewDatabase creates a new trie database to store ephemeral trie content before
|
|
// its written out to disk or garbage collected. No read cache is created, so all
|
|
// data retrievals will hit the underlying disk database.
|
|
func NewDatabase(diskdb ethdb.KeyValueStore) *Database {
|
|
return NewDatabaseWithConfig(diskdb, nil)
|
|
}
|
|
|
|
// NewDatabaseWithConfig creates a new trie database to store ephemeral trie content
|
|
// before its written out to disk or garbage collected. It also acts as a read cache
|
|
// for nodes loaded from disk.
|
|
func NewDatabaseWithConfig(diskdb ethdb.KeyValueStore, config *Config) *Database {
|
|
var cleans *fastcache.Cache
|
|
if config != nil && config.Cache > 0 {
|
|
if config.Journal == "" {
|
|
cleans = fastcache.New(config.Cache * 1024 * 1024)
|
|
} else {
|
|
cleans = fastcache.LoadFromFileOrNew(config.Journal, config.Cache*1024*1024)
|
|
}
|
|
}
|
|
db := &Database{
|
|
diskdb: diskdb,
|
|
cleans: cleans,
|
|
dirties: map[common.Hash]*cachedNode{{}: {
|
|
children: make(map[common.Hash]uint16),
|
|
}},
|
|
}
|
|
if config == nil || config.Preimages { // TODO(karalabe): Flip to default off in the future
|
|
db.preimages = make(map[common.Hash][]byte)
|
|
}
|
|
return db
|
|
}
|
|
|
|
// DiskDB retrieves the persistent storage backing the trie database.
|
|
func (db *Database) DiskDB() ethdb.KeyValueStore {
|
|
return db.diskdb
|
|
}
|
|
|
|
// insert inserts a collapsed trie node into the memory database.
|
|
// The blob size must be specified to allow proper size tracking.
|
|
// All nodes inserted by this function will be reference tracked
|
|
// and in theory should only used for **trie nodes** insertion.
|
|
func (db *Database) insert(hash common.Hash, size int, node node) {
|
|
// If the node's already cached, skip
|
|
if _, ok := db.dirties[hash]; ok {
|
|
return
|
|
}
|
|
memcacheDirtyWriteMeter.Mark(int64(size))
|
|
|
|
// Create the cached entry for this node
|
|
entry := &cachedNode{
|
|
node: simplifyNode(node),
|
|
size: uint16(size),
|
|
flushPrev: db.newest,
|
|
}
|
|
entry.forChilds(func(child common.Hash) {
|
|
if c := db.dirties[child]; c != nil {
|
|
c.parents++
|
|
}
|
|
})
|
|
db.dirties[hash] = entry
|
|
|
|
// Update the flush-list endpoints
|
|
if db.oldest == (common.Hash{}) {
|
|
db.oldest, db.newest = hash, hash
|
|
} else {
|
|
db.dirties[db.newest].flushNext, db.newest = hash, hash
|
|
}
|
|
db.dirtiesSize += common.StorageSize(common.HashLength + entry.size)
|
|
}
|
|
|
|
// insertPreimage writes a new trie node pre-image to the memory database if it's
|
|
// yet unknown. The method will NOT make a copy of the slice,
|
|
// only use if the preimage will NOT be changed later on.
|
|
//
|
|
// Note, this method assumes that the database's lock is held!
|
|
func (db *Database) insertPreimage(hash common.Hash, preimage []byte) {
|
|
// Short circuit if preimage collection is disabled
|
|
if db.preimages == nil {
|
|
return
|
|
}
|
|
// Track the preimage if a yet unknown one
|
|
if _, ok := db.preimages[hash]; ok {
|
|
return
|
|
}
|
|
db.preimages[hash] = preimage
|
|
db.preimagesSize += common.StorageSize(common.HashLength + len(preimage))
|
|
}
|
|
|
|
// node retrieves a cached trie node from memory, or returns nil if none can be
|
|
// found in the memory cache.
|
|
func (db *Database) node(hash common.Hash) node {
|
|
// Retrieve the node from the clean cache if available
|
|
if db.cleans != nil {
|
|
if enc := db.cleans.Get(nil, hash[:]); enc != nil {
|
|
memcacheCleanHitMeter.Mark(1)
|
|
memcacheCleanReadMeter.Mark(int64(len(enc)))
|
|
return mustDecodeNode(hash[:], enc)
|
|
}
|
|
}
|
|
// Retrieve the node from the dirty cache if available
|
|
db.lock.RLock()
|
|
dirty := db.dirties[hash]
|
|
db.lock.RUnlock()
|
|
|
|
if dirty != nil {
|
|
memcacheDirtyHitMeter.Mark(1)
|
|
memcacheDirtyReadMeter.Mark(int64(dirty.size))
|
|
return dirty.obj(hash)
|
|
}
|
|
memcacheDirtyMissMeter.Mark(1)
|
|
|
|
// Content unavailable in memory, attempt to retrieve from disk
|
|
enc, err := db.diskdb.Get(hash[:])
|
|
if err != nil || enc == nil {
|
|
return nil
|
|
}
|
|
if db.cleans != nil {
|
|
db.cleans.Set(hash[:], enc)
|
|
memcacheCleanMissMeter.Mark(1)
|
|
memcacheCleanWriteMeter.Mark(int64(len(enc)))
|
|
}
|
|
return mustDecodeNode(hash[:], enc)
|
|
}
|
|
|
|
// Node retrieves an encoded cached trie node from memory. If it cannot be found
|
|
// cached, the method queries the persistent database for the content.
|
|
func (db *Database) Node(hash common.Hash) ([]byte, error) {
|
|
// It doesn't make sense to retrieve the metaroot
|
|
if hash == (common.Hash{}) {
|
|
return nil, errors.New("not found")
|
|
}
|
|
// Retrieve the node from the clean cache if available
|
|
if db.cleans != nil {
|
|
if enc := db.cleans.Get(nil, hash[:]); enc != nil {
|
|
memcacheCleanHitMeter.Mark(1)
|
|
memcacheCleanReadMeter.Mark(int64(len(enc)))
|
|
return enc, nil
|
|
}
|
|
}
|
|
// Retrieve the node from the dirty cache if available
|
|
db.lock.RLock()
|
|
dirty := db.dirties[hash]
|
|
db.lock.RUnlock()
|
|
|
|
if dirty != nil {
|
|
memcacheDirtyHitMeter.Mark(1)
|
|
memcacheDirtyReadMeter.Mark(int64(dirty.size))
|
|
return dirty.rlp(), nil
|
|
}
|
|
memcacheDirtyMissMeter.Mark(1)
|
|
|
|
// Content unavailable in memory, attempt to retrieve from disk
|
|
enc := rawdb.ReadTrieNode(db.diskdb, hash)
|
|
if len(enc) != 0 {
|
|
if db.cleans != nil {
|
|
db.cleans.Set(hash[:], enc)
|
|
memcacheCleanMissMeter.Mark(1)
|
|
memcacheCleanWriteMeter.Mark(int64(len(enc)))
|
|
}
|
|
return enc, nil
|
|
}
|
|
return nil, errors.New("not found")
|
|
}
|
|
|
|
// preimage retrieves a cached trie node pre-image from memory. If it cannot be
|
|
// found cached, the method queries the persistent database for the content.
|
|
func (db *Database) preimage(hash common.Hash) []byte {
|
|
// Short circuit if preimage collection is disabled
|
|
if db.preimages == nil {
|
|
return nil
|
|
}
|
|
// Retrieve the node from cache if available
|
|
db.lock.RLock()
|
|
preimage := db.preimages[hash]
|
|
db.lock.RUnlock()
|
|
|
|
if preimage != nil {
|
|
return preimage
|
|
}
|
|
return rawdb.ReadPreimage(db.diskdb, hash)
|
|
}
|
|
|
|
// Nodes retrieves the hashes of all the nodes cached within the memory database.
|
|
// This method is extremely expensive and should only be used to validate internal
|
|
// states in test code.
|
|
func (db *Database) Nodes() []common.Hash {
|
|
db.lock.RLock()
|
|
defer db.lock.RUnlock()
|
|
|
|
var hashes = make([]common.Hash, 0, len(db.dirties))
|
|
for hash := range db.dirties {
|
|
if hash != (common.Hash{}) { // Special case for "root" references/nodes
|
|
hashes = append(hashes, hash)
|
|
}
|
|
}
|
|
return hashes
|
|
}
|
|
|
|
// Reference adds a new reference from a parent node to a child node.
|
|
// This function is used to add reference between internal trie node
|
|
// and external node(e.g. storage trie root), all internal trie nodes
|
|
// are referenced together by database itself.
|
|
func (db *Database) Reference(child common.Hash, parent common.Hash) {
|
|
db.lock.Lock()
|
|
db.reference(child, parent)
|
|
var roughDirtiesSize = common.StorageSize((len(db.dirties)-1)*cachedNodeSize) + db.dirtiesSize + db.childrenSize - common.StorageSize(len(db.dirties[common.Hash{}].children)*(common.HashLength+2))
|
|
var roughPreimagesSize = db.preimagesSize
|
|
db.lock.Unlock()
|
|
|
|
db.sizeLock.Lock()
|
|
db.roughDirtiesSize = roughDirtiesSize
|
|
db.roughPreimagesSize = roughPreimagesSize
|
|
db.sizeLock.Unlock()
|
|
}
|
|
|
|
// reference is the private locked version of Reference.
|
|
func (db *Database) reference(child common.Hash, parent common.Hash) {
|
|
// If the node does not exist, it's a node pulled from disk, skip
|
|
node, ok := db.dirties[child]
|
|
if !ok {
|
|
return
|
|
}
|
|
// If the reference already exists, only duplicate for roots
|
|
if db.dirties[parent].children == nil {
|
|
db.dirties[parent].children = make(map[common.Hash]uint16)
|
|
db.childrenSize += cachedNodeChildrenSize
|
|
} else if _, ok = db.dirties[parent].children[child]; ok && parent != (common.Hash{}) {
|
|
return
|
|
}
|
|
node.parents++
|
|
db.dirties[parent].children[child]++
|
|
if db.dirties[parent].children[child] == 1 {
|
|
db.childrenSize += common.HashLength + 2 // uint16 counter
|
|
}
|
|
}
|
|
|
|
// Dereference removes an existing reference from a root node.
|
|
func (db *Database) Dereference(root common.Hash) {
|
|
// Sanity check to ensure that the meta-root is not removed
|
|
if root == (common.Hash{}) {
|
|
log.Error("Attempted to dereference the trie cache meta root")
|
|
return
|
|
}
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
|
|
nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now()
|
|
db.dereference(root, common.Hash{})
|
|
|
|
db.gcnodes += uint64(nodes - len(db.dirties))
|
|
db.gcsize += storage - db.dirtiesSize
|
|
db.gctime += time.Since(start)
|
|
|
|
memcacheGCTimeTimer.Update(time.Since(start))
|
|
memcacheGCSizeMeter.Mark(int64(storage - db.dirtiesSize))
|
|
memcacheGCNodesMeter.Mark(int64(nodes - len(db.dirties)))
|
|
|
|
log.Debug("Dereferenced trie from memory database", "nodes", nodes-len(db.dirties), "size", storage-db.dirtiesSize, "time", time.Since(start),
|
|
"gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize)
|
|
}
|
|
|
|
// dereference is the private locked version of Dereference.
|
|
func (db *Database) dereference(child common.Hash, parent common.Hash) {
|
|
// Dereference the parent-child
|
|
node := db.dirties[parent]
|
|
|
|
if node.children != nil && node.children[child] > 0 {
|
|
node.children[child]--
|
|
if node.children[child] == 0 {
|
|
delete(node.children, child)
|
|
db.childrenSize -= (common.HashLength + 2) // uint16 counter
|
|
}
|
|
}
|
|
// If the child does not exist, it's a previously committed node.
|
|
node, ok := db.dirties[child]
|
|
if !ok {
|
|
return
|
|
}
|
|
// If there are no more references to the child, delete it and cascade
|
|
if node.parents > 0 {
|
|
// This is a special cornercase where a node loaded from disk (i.e. not in the
|
|
// memcache any more) gets reinjected as a new node (short node split into full,
|
|
// then reverted into short), causing a cached node to have no parents. That is
|
|
// no problem in itself, but don't make maxint parents out of it.
|
|
node.parents--
|
|
}
|
|
if node.parents == 0 {
|
|
// Remove the node from the flush-list
|
|
switch child {
|
|
case db.oldest:
|
|
db.oldest = node.flushNext
|
|
db.dirties[node.flushNext].flushPrev = common.Hash{}
|
|
case db.newest:
|
|
db.newest = node.flushPrev
|
|
db.dirties[node.flushPrev].flushNext = common.Hash{}
|
|
default:
|
|
db.dirties[node.flushPrev].flushNext = node.flushNext
|
|
db.dirties[node.flushNext].flushPrev = node.flushPrev
|
|
}
|
|
// Dereference all children and delete the node
|
|
node.forChilds(func(hash common.Hash) {
|
|
db.dereference(hash, child)
|
|
})
|
|
delete(db.dirties, child)
|
|
db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size))
|
|
if node.children != nil {
|
|
db.childrenSize -= cachedNodeChildrenSize
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cap iteratively flushes old but still referenced trie nodes until the total
|
|
// memory usage goes below the given threshold.
|
|
//
|
|
// Note, this method is a non-synchronized mutator. It is unsafe to call this
|
|
// concurrently with other mutators.
|
|
func (db *Database) Cap(limit common.StorageSize) error {
|
|
// Create a database batch to flush persistent data out. It is important that
|
|
// outside code doesn't see an inconsistent state (referenced data removed from
|
|
// memory cache during commit but not yet in persistent storage). This is ensured
|
|
// by only uncaching existing data when the database write finalizes.
|
|
nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now()
|
|
batch := db.diskdb.NewBatch()
|
|
|
|
// db.dirtiesSize only contains the useful data in the cache, but when reporting
|
|
// the total memory consumption, the maintenance metadata is also needed to be
|
|
// counted.
|
|
size := db.dirtiesSize + common.StorageSize((len(db.dirties)-1)*cachedNodeSize)
|
|
size += db.childrenSize - common.StorageSize(len(db.dirties[common.Hash{}].children)*(common.HashLength+2))
|
|
|
|
// If the preimage cache got large enough, push to disk. If it's still small
|
|
// leave for later to deduplicate writes.
|
|
flushPreimages := db.preimagesSize > 4*1024*1024
|
|
if flushPreimages {
|
|
if db.preimages == nil {
|
|
log.Error("Attempted to write preimages whilst disabled")
|
|
} else {
|
|
rawdb.WritePreimages(batch, db.preimages)
|
|
if batch.ValueSize() > ethdb.IdealBatchSize {
|
|
if err := batch.Write(); err != nil {
|
|
return err
|
|
}
|
|
batch.Reset()
|
|
}
|
|
}
|
|
}
|
|
// Keep committing nodes from the flush-list until we're below allowance
|
|
oldest := db.oldest
|
|
for size > limit && oldest != (common.Hash{}) {
|
|
// Fetch the oldest referenced node and push into the batch
|
|
node := db.dirties[oldest]
|
|
rawdb.WriteTrieNode(batch, oldest, node.rlp())
|
|
|
|
// If we exceeded the ideal batch size, commit and reset
|
|
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
|
if err := batch.Write(); err != nil {
|
|
log.Error("Failed to write flush list to disk", "err", err)
|
|
return err
|
|
}
|
|
batch.Reset()
|
|
}
|
|
// Iterate to the next flush item, or abort if the size cap was achieved. Size
|
|
// is the total size, including the useful cached data (hash -> blob), the
|
|
// cache item metadata, as well as external children mappings.
|
|
size -= common.StorageSize(common.HashLength + int(node.size) + cachedNodeSize)
|
|
if node.children != nil {
|
|
size -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2))
|
|
}
|
|
oldest = node.flushNext
|
|
}
|
|
// Flush out any remainder data from the last batch
|
|
if err := batch.Write(); err != nil {
|
|
log.Error("Failed to write flush list to disk", "err", err)
|
|
return err
|
|
}
|
|
// Write successful, clear out the flushed data
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
|
|
if flushPreimages {
|
|
if db.preimages == nil {
|
|
log.Error("Attempted to reset preimage cache whilst disabled")
|
|
} else {
|
|
db.preimages, db.preimagesSize = make(map[common.Hash][]byte), 0
|
|
}
|
|
}
|
|
for db.oldest != oldest {
|
|
node := db.dirties[db.oldest]
|
|
delete(db.dirties, db.oldest)
|
|
db.oldest = node.flushNext
|
|
|
|
db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size))
|
|
if node.children != nil {
|
|
db.childrenSize -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2))
|
|
}
|
|
}
|
|
if db.oldest != (common.Hash{}) {
|
|
db.dirties[db.oldest].flushPrev = common.Hash{}
|
|
}
|
|
db.flushnodes += uint64(nodes - len(db.dirties))
|
|
db.flushsize += storage - db.dirtiesSize
|
|
db.flushtime += time.Since(start)
|
|
|
|
memcacheFlushTimeTimer.Update(time.Since(start))
|
|
memcacheFlushSizeMeter.Mark(int64(storage - db.dirtiesSize))
|
|
memcacheFlushNodesMeter.Mark(int64(nodes - len(db.dirties)))
|
|
|
|
log.Debug("Persisted nodes from memory database", "nodes", nodes-len(db.dirties), "size", storage-db.dirtiesSize, "time", time.Since(start),
|
|
"flushnodes", db.flushnodes, "flushsize", db.flushsize, "flushtime", db.flushtime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Commit iterates over all the children of a particular node, writes them out
|
|
// to disk, forcefully tearing down all references in both directions. As a side
|
|
// effect, all pre-images accumulated up to this point are also written.
|
|
//
|
|
// Note, this method is a non-synchronized mutator. It is unsafe to call this
|
|
// concurrently with other mutators.
|
|
func (db *Database) Commit(node common.Hash, report bool, callback func(common.Hash)) error {
|
|
// Create a database batch to flush persistent data out. It is important that
|
|
// outside code doesn't see an inconsistent state (referenced data removed from
|
|
// memory cache during commit but not yet in persistent storage). This is ensured
|
|
// by only uncaching existing data when the database write finalizes.
|
|
start := time.Now()
|
|
batch := db.diskdb.NewBatch()
|
|
|
|
// Move all of the accumulated preimages into a write batch
|
|
if db.preimages != nil {
|
|
rawdb.WritePreimages(batch, db.preimages)
|
|
// Since we're going to replay trie node writes into the clean cache, flush out
|
|
// any batched pre-images before continuing.
|
|
if err := batch.Write(); err != nil {
|
|
return err
|
|
}
|
|
batch.Reset()
|
|
}
|
|
// Move the trie itself into the batch, flushing if enough data is accumulated
|
|
nodes, storage := len(db.dirties), db.dirtiesSize
|
|
|
|
uncacher := &cleaner{db}
|
|
if err := db.commit(node, batch, uncacher, callback); err != nil {
|
|
log.Error("Failed to commit trie from trie database", "err", err)
|
|
return err
|
|
}
|
|
// Trie mostly committed to disk, flush any batch leftovers
|
|
if err := batch.Write(); err != nil {
|
|
log.Error("Failed to write trie to disk", "err", err)
|
|
return err
|
|
}
|
|
// Uncache any leftovers in the last batch
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
|
|
batch.Replay(uncacher)
|
|
batch.Reset()
|
|
|
|
// Reset the storage counters and bumped metrics
|
|
if db.preimages != nil {
|
|
db.preimages, db.preimagesSize = make(map[common.Hash][]byte), 0
|
|
}
|
|
memcacheCommitTimeTimer.Update(time.Since(start))
|
|
memcacheCommitSizeMeter.Mark(int64(storage - db.dirtiesSize))
|
|
memcacheCommitNodesMeter.Mark(int64(nodes - len(db.dirties)))
|
|
|
|
logger := log.Info
|
|
if !report {
|
|
logger = log.Debug
|
|
}
|
|
logger("Persisted trie from memory database", "nodes", nodes-len(db.dirties)+int(db.flushnodes), "size", storage-db.dirtiesSize+db.flushsize, "time", time.Since(start)+db.flushtime,
|
|
"gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize)
|
|
|
|
// Reset the garbage collection statistics
|
|
db.gcnodes, db.gcsize, db.gctime = 0, 0, 0
|
|
db.flushnodes, db.flushsize, db.flushtime = 0, 0, 0
|
|
|
|
return nil
|
|
}
|
|
|
|
// commit is the private locked version of Commit.
|
|
func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner, callback func(common.Hash)) error {
|
|
// If the node does not exist, it's a previously committed node
|
|
node, ok := db.dirties[hash]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
var err error
|
|
node.forChilds(func(child common.Hash) {
|
|
if err == nil {
|
|
err = db.commit(child, batch, uncacher, callback)
|
|
}
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// If we've reached an optimal batch size, commit and start over
|
|
rawdb.WriteTrieNode(batch, hash, node.rlp())
|
|
if callback != nil {
|
|
callback(hash)
|
|
}
|
|
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
|
if err := batch.Write(); err != nil {
|
|
return err
|
|
}
|
|
db.lock.Lock()
|
|
batch.Replay(uncacher)
|
|
batch.Reset()
|
|
db.lock.Unlock()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cleaner is a database batch replayer that takes a batch of write operations
|
|
// and cleans up the trie database from anything written to disk.
|
|
type cleaner struct {
|
|
db *Database
|
|
}
|
|
|
|
// Put reacts to database writes and implements dirty data uncaching. This is the
|
|
// post-processing step of a commit operation where the already persisted trie is
|
|
// removed from the dirty cache and moved into the clean cache. The reason behind
|
|
// the two-phase commit is to ensure ensure data availability while moving from
|
|
// memory to disk.
|
|
func (c *cleaner) Put(key []byte, rlp []byte) error {
|
|
hash := common.BytesToHash(key)
|
|
|
|
// If the node does not exist, we're done on this path
|
|
node, ok := c.db.dirties[hash]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
// Node still exists, remove it from the flush-list
|
|
switch hash {
|
|
case c.db.oldest:
|
|
c.db.oldest = node.flushNext
|
|
c.db.dirties[node.flushNext].flushPrev = common.Hash{}
|
|
case c.db.newest:
|
|
c.db.newest = node.flushPrev
|
|
c.db.dirties[node.flushPrev].flushNext = common.Hash{}
|
|
default:
|
|
c.db.dirties[node.flushPrev].flushNext = node.flushNext
|
|
c.db.dirties[node.flushNext].flushPrev = node.flushPrev
|
|
}
|
|
// Remove the node from the dirty cache
|
|
delete(c.db.dirties, hash)
|
|
c.db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size))
|
|
if node.children != nil {
|
|
c.db.dirtiesSize -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2))
|
|
}
|
|
// Move the flushed node into the clean cache to prevent insta-reloads
|
|
if c.db.cleans != nil {
|
|
c.db.cleans.Set(hash[:], rlp)
|
|
memcacheCleanWriteMeter.Mark(int64(len(rlp)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cleaner) Delete(key []byte) error {
|
|
panic("not implemented")
|
|
}
|
|
|
|
// Size returns the current storage size of the memory cache in front of the
|
|
// persistent database layer.
|
|
func (db *Database) Size() (common.StorageSize, common.StorageSize) {
|
|
db.sizeLock.RLock()
|
|
defer db.sizeLock.RUnlock()
|
|
return db.roughDirtiesSize, db.roughPreimagesSize
|
|
}
|
|
|
|
// saveCache saves clean state cache to given directory path
|
|
// using specified CPU cores.
|
|
func (db *Database) saveCache(dir string, threads int) error {
|
|
if db.cleans == nil {
|
|
return nil
|
|
}
|
|
log.Info("Writing clean trie cache to disk", "path", dir, "threads", threads)
|
|
|
|
start := time.Now()
|
|
err := db.cleans.SaveToFileConcurrent(dir, threads)
|
|
if err != nil {
|
|
log.Error("Failed to persist clean trie cache", "error", err)
|
|
return err
|
|
}
|
|
log.Info("Persisted the clean trie cache", "path", dir, "elapsed", common.PrettyDuration(time.Since(start)))
|
|
return nil
|
|
}
|
|
|
|
// SaveCache atomically saves fast cache data to the given dir using all
|
|
// available CPU cores.
|
|
func (db *Database) SaveCache(dir string) error {
|
|
return db.saveCache(dir, runtime.GOMAXPROCS(0))
|
|
}
|
|
|
|
// SaveCachePeriodically atomically saves fast cache data to the given dir with
|
|
// the specified interval. All dump operation will only use a single CPU core.
|
|
func (db *Database) SaveCachePeriodically(dir string, interval time.Duration, stopCh <-chan struct{}) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
db.saveCache(dir, 1)
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|