core: reduce peak memory usage during reorg (#30600)
~~Opening this as a draft to have a discussion.~~ Pressed the wrong button I had [a previous PR ](https://github.com/ethereum/go-ethereum/pull/24616)a long time ago which reduced the peak memory used during reorgs by not accumulating all transactions and logs. This PR reduces the peak memory further by not storing the blocks in memory. However this means we need to pull the blocks back up from storage multiple times during the reorg. I collected the following numbers on peak memory usage: // Master: BenchmarkReorg-8 10000 899591 ns/op 820154 B/op 1440 allocs/op 1549443072 bytes of heap used // WithoutOldChain: BenchmarkReorg-8 10000 1147281 ns/op 943163 B/op 1564 allocs/op 1163870208 bytes of heap used // WithoutNewChain: BenchmarkReorg-8 10000 1018922 ns/op 943580 B/op 1564 allocs/op 1171890176 bytes of heap used Each block contains a transaction with ~50k bytes and we're doing a 10k block reorg, so the chain should be ~500MB in size --------- Co-authored-by: Péter Szilágyi <peterke@gmail.com>
This commit is contained in:
parent
368e16f39d
commit
18a591811f
@ -23,6 +23,7 @@ import (
|
||||
"io"
|
||||
"math/big"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -1435,7 +1436,7 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
|
||||
func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
|
||||
current := bc.CurrentBlock()
|
||||
if block.ParentHash() != current.Hash() {
|
||||
if err := bc.reorg(current, block); err != nil {
|
||||
if err := bc.reorg(current, block.Header()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -1541,7 +1542,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
|
||||
|
||||
// Reorganise the chain if the parent is not the head block
|
||||
if block.ParentHash() != currentBlock.Hash() {
|
||||
if err := bc.reorg(currentBlock, block); err != nil {
|
||||
if err := bc.reorg(currentBlock, block.Header()); err != nil {
|
||||
return NonStatTy, err
|
||||
}
|
||||
}
|
||||
@ -2154,8 +2155,8 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co
|
||||
return block.Hash(), nil
|
||||
}
|
||||
|
||||
// collectLogs collects the logs that were generated or removed during
|
||||
// the processing of a block. These logs are later announced as deleted or reborn.
|
||||
// collectLogs collects the logs that were generated or removed during the
|
||||
// processing of a block. These logs are later announced as deleted or reborn.
|
||||
func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
|
||||
var blobGasPrice *big.Int
|
||||
excessBlobGas := b.ExcessBlobGas()
|
||||
@ -2181,70 +2182,55 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
|
||||
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
|
||||
// blocks and inserts them to be part of the new canonical chain and accumulates
|
||||
// potential missing transactions and post an event about them.
|
||||
//
|
||||
// Note the new head block won't be processed here, callers need to handle it
|
||||
// externally.
|
||||
func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
|
||||
func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error {
|
||||
var (
|
||||
newChain types.Blocks
|
||||
oldChain types.Blocks
|
||||
commonBlock *types.Block
|
||||
|
||||
deletedTxs []common.Hash
|
||||
addedTxs []common.Hash
|
||||
newChain []*types.Header
|
||||
oldChain []*types.Header
|
||||
commonBlock *types.Header
|
||||
)
|
||||
oldBlock := bc.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
|
||||
if oldBlock == nil {
|
||||
return errors.New("current head block missing")
|
||||
}
|
||||
newBlock := newHead
|
||||
|
||||
// Reduce the longer chain to the same number as the shorter one
|
||||
if oldBlock.NumberU64() > newBlock.NumberU64() {
|
||||
if oldHead.Number.Uint64() > newHead.Number.Uint64() {
|
||||
// Old chain is longer, gather all transactions and logs as deleted ones
|
||||
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
|
||||
oldChain = append(oldChain, oldBlock)
|
||||
for _, tx := range oldBlock.Transactions() {
|
||||
deletedTxs = append(deletedTxs, tx.Hash())
|
||||
}
|
||||
for ; oldHead != nil && oldHead.Number.Uint64() != newHead.Number.Uint64(); oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) {
|
||||
oldChain = append(oldChain, oldHead)
|
||||
}
|
||||
} else {
|
||||
// New chain is longer, stash all blocks away for subsequent insertion
|
||||
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
|
||||
newChain = append(newChain, newBlock)
|
||||
for ; newHead != nil && newHead.Number.Uint64() != oldHead.Number.Uint64(); newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) {
|
||||
newChain = append(newChain, newHead)
|
||||
}
|
||||
}
|
||||
if oldBlock == nil {
|
||||
if oldHead == nil {
|
||||
return errInvalidOldChain
|
||||
}
|
||||
if newBlock == nil {
|
||||
if newHead == nil {
|
||||
return errInvalidNewChain
|
||||
}
|
||||
// Both sides of the reorg are at the same number, reduce both until the common
|
||||
// ancestor is found
|
||||
for {
|
||||
// If the common ancestor was found, bail out
|
||||
if oldBlock.Hash() == newBlock.Hash() {
|
||||
commonBlock = oldBlock
|
||||
if oldHead.Hash() == newHead.Hash() {
|
||||
commonBlock = oldHead
|
||||
break
|
||||
}
|
||||
// Remove an old block as well as stash away a new block
|
||||
oldChain = append(oldChain, oldBlock)
|
||||
for _, tx := range oldBlock.Transactions() {
|
||||
deletedTxs = append(deletedTxs, tx.Hash())
|
||||
}
|
||||
newChain = append(newChain, newBlock)
|
||||
oldChain = append(oldChain, oldHead)
|
||||
newChain = append(newChain, newHead)
|
||||
|
||||
// Step back with both chains
|
||||
oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1)
|
||||
if oldBlock == nil {
|
||||
oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1)
|
||||
if oldHead == nil {
|
||||
return errInvalidOldChain
|
||||
}
|
||||
newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
|
||||
if newBlock == nil {
|
||||
newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1)
|
||||
if newHead == nil {
|
||||
return errInvalidNewChain
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the user sees large reorgs
|
||||
if len(oldChain) > 0 && len(newChain) > 0 {
|
||||
logFn := log.Info
|
||||
@ -2253,7 +2239,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
|
||||
msg = "Large chain reorg detected"
|
||||
logFn = log.Warn
|
||||
}
|
||||
logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(),
|
||||
logFn(msg, "number", commonBlock.Number, "hash", commonBlock.Hash(),
|
||||
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
|
||||
blockReorgAddMeter.Mark(int64(len(newChain)))
|
||||
blockReorgDropMeter.Mark(int64(len(oldChain)))
|
||||
@ -2261,55 +2247,112 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
|
||||
} else if len(newChain) > 0 {
|
||||
// Special case happens in the post merge stage that current head is
|
||||
// the ancestor of new head while these two blocks are not consecutive
|
||||
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number(), "hash", newChain[0].Hash())
|
||||
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash())
|
||||
blockReorgAddMeter.Mark(int64(len(newChain)))
|
||||
} else {
|
||||
// len(newChain) == 0 && len(oldChain) > 0
|
||||
// rewind the canonical chain to a lower point.
|
||||
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "oldblocks", len(oldChain), "newnum", newBlock.Number(), "newhash", newBlock.Hash(), "newblocks", len(newChain))
|
||||
log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain))
|
||||
}
|
||||
// Acquire the tx-lookup lock before mutation. This step is essential
|
||||
// as the txlookups should be changed atomically, and all subsequent
|
||||
// reads should be blocked until the mutation is complete.
|
||||
bc.txLookupLock.Lock()
|
||||
|
||||
// Insert the new chain segment in incremental order, from the old
|
||||
// to the new. The new chain head (newChain[0]) is not inserted here,
|
||||
// as it will be handled separately outside of this function
|
||||
for i := len(newChain) - 1; i >= 1; i-- {
|
||||
// Insert the block in the canonical way, re-writing history
|
||||
bc.writeHeadBlock(newChain[i])
|
||||
// Reorg can be executed, start reducing the chain's old blocks and appending
|
||||
// the new blocks
|
||||
var (
|
||||
deletedTxs []common.Hash
|
||||
rebirthTxs []common.Hash
|
||||
|
||||
// Collect the new added transactions.
|
||||
for _, tx := range newChain[i].Transactions() {
|
||||
addedTxs = append(addedTxs, tx.Hash())
|
||||
deletedLogs []*types.Log
|
||||
rebirthLogs []*types.Log
|
||||
)
|
||||
// Deleted log emission on the API uses forward order, which is borked, but
|
||||
// we'll leave it in for legacy reasons.
|
||||
//
|
||||
// TODO(karalabe): This should be nuked out, no idea how, deprecate some APIs?
|
||||
{
|
||||
for i := len(oldChain) - 1; i >= 0; i-- {
|
||||
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
|
||||
if block == nil {
|
||||
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
|
||||
}
|
||||
if logs := bc.collectLogs(block, true); len(logs) > 0 {
|
||||
deletedLogs = append(deletedLogs, logs...)
|
||||
}
|
||||
if len(deletedLogs) > 512 {
|
||||
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
|
||||
deletedLogs = nil
|
||||
}
|
||||
}
|
||||
if len(deletedLogs) > 0 {
|
||||
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
|
||||
}
|
||||
}
|
||||
// Undo old blocks in reverse order
|
||||
for i := 0; i < len(oldChain); i++ {
|
||||
// Collect all the deleted transactions
|
||||
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
|
||||
if block == nil {
|
||||
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
|
||||
}
|
||||
for _, tx := range block.Transactions() {
|
||||
deletedTxs = append(deletedTxs, tx.Hash())
|
||||
}
|
||||
// Collect deleted logs and emit them for new integrations
|
||||
if logs := bc.collectLogs(block, true); len(logs) > 0 {
|
||||
// Emit revertals latest first, older then
|
||||
slices.Reverse(logs)
|
||||
|
||||
// TODO(karalabe): Hook into the reverse emission part
|
||||
}
|
||||
}
|
||||
// Apply new blocks in forward order
|
||||
for i := len(newChain) - 1; i >= 1; i-- {
|
||||
// Collect all the included transactions
|
||||
block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64())
|
||||
if block == nil {
|
||||
return errInvalidNewChain // Corrupt database, mostly here to avoid weird panics
|
||||
}
|
||||
for _, tx := range block.Transactions() {
|
||||
rebirthTxs = append(rebirthTxs, tx.Hash())
|
||||
}
|
||||
// Collect inserted logs and emit them
|
||||
if logs := bc.collectLogs(block, false); len(logs) > 0 {
|
||||
rebirthLogs = append(rebirthLogs, logs...)
|
||||
}
|
||||
if len(rebirthLogs) > 512 {
|
||||
bc.logsFeed.Send(rebirthLogs)
|
||||
rebirthLogs = nil
|
||||
}
|
||||
// Update the head block
|
||||
bc.writeHeadBlock(block)
|
||||
}
|
||||
if len(rebirthLogs) > 0 {
|
||||
bc.logsFeed.Send(rebirthLogs)
|
||||
}
|
||||
// Delete useless indexes right now which includes the non-canonical
|
||||
// transaction indexes, canonical chain indexes which above the head.
|
||||
var (
|
||||
indexesBatch = bc.db.NewBatch()
|
||||
diffs = types.HashDifference(deletedTxs, addedTxs)
|
||||
)
|
||||
for _, tx := range diffs {
|
||||
rawdb.DeleteTxLookupEntry(indexesBatch, tx)
|
||||
batch := bc.db.NewBatch()
|
||||
for _, tx := range types.HashDifference(deletedTxs, rebirthTxs) {
|
||||
rawdb.DeleteTxLookupEntry(batch, tx)
|
||||
}
|
||||
// Delete all hash markers that are not part of the new canonical chain.
|
||||
// Because the reorg function does not handle new chain head, all hash
|
||||
// markers greater than or equal to new chain head should be deleted.
|
||||
number := commonBlock.NumberU64()
|
||||
number := commonBlock.Number
|
||||
if len(newChain) > 1 {
|
||||
number = newChain[1].NumberU64()
|
||||
number = newChain[1].Number
|
||||
}
|
||||
for i := number + 1; ; i++ {
|
||||
for i := number.Uint64() + 1; ; i++ {
|
||||
hash := rawdb.ReadCanonicalHash(bc.db, i)
|
||||
if hash == (common.Hash{}) {
|
||||
break
|
||||
}
|
||||
rawdb.DeleteCanonicalHash(indexesBatch, i)
|
||||
rawdb.DeleteCanonicalHash(batch, i)
|
||||
}
|
||||
if err := indexesBatch.Write(); err != nil {
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Failed to delete useless indexes", "err", err)
|
||||
}
|
||||
// Reset the tx lookup cache to clear stale txlookup cache.
|
||||
@ -2318,40 +2361,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
|
||||
// Release the tx-lookup lock after mutation.
|
||||
bc.txLookupLock.Unlock()
|
||||
|
||||
// Send out events for logs from the old canon chain, and 'reborn'
|
||||
// logs from the new canon chain. The number of logs can be very
|
||||
// high, so the events are sent in batches of size around 512.
|
||||
|
||||
// Deleted logs + blocks:
|
||||
var deletedLogs []*types.Log
|
||||
for i := len(oldChain) - 1; i >= 0; i-- {
|
||||
// Collect deleted logs for notification
|
||||
if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 {
|
||||
deletedLogs = append(deletedLogs, logs...)
|
||||
}
|
||||
if len(deletedLogs) > 512 {
|
||||
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
|
||||
deletedLogs = nil
|
||||
}
|
||||
}
|
||||
if len(deletedLogs) > 0 {
|
||||
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
|
||||
}
|
||||
|
||||
// New logs:
|
||||
var rebirthLogs []*types.Log
|
||||
for i := len(newChain) - 1; i >= 1; i-- {
|
||||
if logs := bc.collectLogs(newChain[i], false); len(logs) > 0 {
|
||||
rebirthLogs = append(rebirthLogs, logs...)
|
||||
}
|
||||
if len(rebirthLogs) > 512 {
|
||||
bc.logsFeed.Send(rebirthLogs)
|
||||
rebirthLogs = nil
|
||||
}
|
||||
}
|
||||
if len(rebirthLogs) > 0 {
|
||||
bc.logsFeed.Send(rebirthLogs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -2389,7 +2398,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
|
||||
// Run the reorg if necessary and set the given block as new head.
|
||||
start := time.Now()
|
||||
if head.ParentHash() != bc.CurrentBlock().Hash() {
|
||||
if err := bc.reorg(bc.CurrentBlock(), head); err != nil {
|
||||
if err := bc.reorg(bc.CurrentBlock(), head.Header()); err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
}
|
||||
|
@ -4231,3 +4231,36 @@ func TestPragueRequests(t *testing.T) {
|
||||
t.Fatalf("block %d: failed to insert into chain: %v", n, err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkReorg(b *testing.B) {
|
||||
chainLength := b.N
|
||||
|
||||
dir := b.TempDir()
|
||||
db, err := rawdb.NewLevelDBDatabase(dir, 128, 128, "", false)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot create temporary database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
gspec := &Genesis{
|
||||
Config: params.TestChainConfig,
|
||||
Alloc: types.GenesisAlloc{benchRootAddr: {Balance: math.BigPow(2, 254)}},
|
||||
}
|
||||
blockchain, _ := NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
|
||||
defer blockchain.Stop()
|
||||
|
||||
// Insert an easy and a difficult chain afterwards
|
||||
easyBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000))
|
||||
diffBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000))
|
||||
|
||||
if _, err := blockchain.InsertChain(easyBlocks); err != nil {
|
||||
b.Fatalf("failed to insert easy chain: %v", err)
|
||||
}
|
||||
b.ResetTimer()
|
||||
if _, err := blockchain.InsertChain(diffBlocks); err != nil {
|
||||
b.Fatalf("failed to insert difficult chain: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Master: BenchmarkReorg-8 10000 899591 ns/op 820154 B/op 1440 allocs/op 1549443072 bytes of heap used
|
||||
// WithoutOldChain: BenchmarkReorg-8 10000 1147281 ns/op 943163 B/op 1564 allocs/op 1163870208 bytes of heap used
|
||||
// WithoutNewChain: BenchmarkReorg-8 10000 1018922 ns/op 943580 B/op 1564 allocs/op 1171890176 bytes of heap used
|
||||
|
Loading…
Reference in New Issue
Block a user