From 389021a5afd01147c851870c693ded8760e6a08c Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 9 Sep 2022 15:25:55 +0200 Subject: [PATCH] core: place a cap on reorglogs (#25711) This PR makes the event-sending for deleted and new logs happen in batches, to prevent OOM situation due to large reorgs. Co-authored-by: Felix Lange --- core/blockchain.go | 78 ++++++++++++++-------------------- core/blockchain_test.go | 94 +++++++++++++++++++++++++++++------------ 2 files changed, 98 insertions(+), 74 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index f588cc50bd..f73bbb09c2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2000,21 +2000,6 @@ func (bc *BlockChain) collectLogs(hash common.Hash, removed bool) []*types.Log { return logs } -// mergeLogs returns a merged log slice with specified sort order. -func mergeLogs(logs [][]*types.Log, reverse bool) []*types.Log { - var ret []*types.Log - if reverse { - for i := len(logs) - 1; i >= 0; i-- { - ret = append(ret, logs[i]...) - } - } else { - for i := 0; i < len(logs); i++ { - ret = append(ret, logs[i]...) - } - } - return ret -} - // reorg takes two blocks, an old chain and a new chain and will reconstruct the // blocks and inserts them to be part of the new canonical chain and accumulates // potential missing transactions and post an event about them. @@ -2028,9 +2013,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { deletedTxs []common.Hash addedTxs []common.Hash - - deletedLogs [][]*types.Log - rebirthLogs [][]*types.Log ) // Reduce the longer chain to the same number as the shorter one if oldBlock.NumberU64() > newBlock.NumberU64() { @@ -2040,12 +2022,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { for _, tx := range oldBlock.Transactions() { deletedTxs = append(deletedTxs, tx.Hash()) } - - // Collect deleted logs for notification - logs := bc.collectLogs(oldBlock.Hash(), true) - if len(logs) > 0 { - deletedLogs = append(deletedLogs, logs) - } } } else { // New chain is longer, stash all blocks away for subsequent insertion @@ -2072,12 +2048,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { for _, tx := range oldBlock.Transactions() { deletedTxs = append(deletedTxs, tx.Hash()) } - - // Collect deleted logs for notification - logs := bc.collectLogs(oldBlock.Hash(), true) - if len(logs) > 0 { - deletedLogs = append(deletedLogs, logs) - } newChain = append(newChain, newBlock) // Step back with both chains @@ -2151,28 +2121,42 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { log.Crit("Failed to delete useless indexes", "err", err) } - // Collect the logs - for i := len(newChain) - 1; i >= 1; i-- { - // Collect reborn logs due to chain reorg - logs := bc.collectLogs(newChain[i].Hash(), false) - if len(logs) > 0 { - rebirthLogs = append(rebirthLogs, logs) + // Send out events for logs from the old canon chain, and 'reborn' + // logs from the new canon chain. The number of logs can be very + // high, so the events are sent in batches of size around 512. + + // Deleted logs + blocks: + var deletedLogs []*types.Log + for i := len(oldChain) - 1; i >= 0; i-- { + // Also send event for blocks removed from the canon chain. + bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]}) + + // Collect deleted logs for notification + if logs := bc.collectLogs(oldChain[i].Hash(), true); len(logs) > 0 { + deletedLogs = append(deletedLogs, logs...) + } + if len(deletedLogs) > 512 { + bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) + deletedLogs = nil } } - // If any logs need to be fired, do it now. In theory we could avoid creating - // this goroutine if there are no events to fire, but realistcally that only - // ever happens if we're reorging empty blocks, which will only happen on idle - // networks where performance is not an issue either way. if len(deletedLogs) > 0 { - bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)}) + bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) + } + + // New logs: + var rebirthLogs []*types.Log + for i := len(newChain) - 1; i >= 1; i-- { + if logs := bc.collectLogs(newChain[i].Hash(), false); len(logs) > 0 { + rebirthLogs = append(rebirthLogs, logs...) + } + if len(rebirthLogs) > 512 { + bc.logsFeed.Send(rebirthLogs) + rebirthLogs = nil + } } if len(rebirthLogs) > 0 { - bc.logsFeed.Send(mergeLogs(rebirthLogs, false)) - } - if len(oldChain) > 0 { - for i := len(oldChain) - 1; i >= 0; i-- { - bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]}) - } + bc.logsFeed.Send(rebirthLogs) } return nil } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index c17a81048f..06c43658ed 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -1158,37 +1158,53 @@ func TestLogRebirth(t *testing.T) { blockchain.SubscribeLogsEvent(newLogCh) blockchain.SubscribeRemovedLogsEvent(rmLogsCh) - // This chain contains a single log. - genDb, chain, _ := GenerateChainWithGenesis(gspec, engine, 2, func(i int, gen *BlockGen) { - if i == 1 { - tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, logCode), signer, key1) - if err != nil { - t.Fatalf("failed to create tx: %v", err) + // This chain contains 10 logs. + genDb, chain, _ := GenerateChainWithGenesis(gspec, engine, 3, func(i int, gen *BlockGen) { + if i < 2 { + for ii := 0; ii < 5; ii++ { + tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{ + Nonce: gen.TxNonce(addr1), + GasPrice: gen.header.BaseFee, + Gas: uint64(1000001), + Data: logCode, + }) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) } - gen.AddTx(tx) } }) if _, err := blockchain.InsertChain(chain); err != nil { t.Fatalf("failed to insert chain: %v", err) } - checkLogEvents(t, newLogCh, rmLogsCh, 1, 0) + checkLogEvents(t, newLogCh, rmLogsCh, 10, 0) - // Generate long reorg chain containing another log. Inserting the - // chain removes one log and adds one. - _, forkChain, _ := GenerateChainWithGenesis(gspec, engine, 2, func(i int, gen *BlockGen) { - if i == 1 { - tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, logCode), signer, key1) + // Generate long reorg chain containing more logs. Inserting the + // chain removes one log and adds four. + _, forkChain, _ := GenerateChainWithGenesis(gspec, engine, 3, func(i int, gen *BlockGen) { + if i == 2 { + // The last (head) block is not part of the reorg-chain, we can ignore it + return + } + for ii := 0; ii < 5; ii++ { + tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{ + Nonce: gen.TxNonce(addr1), + GasPrice: gen.header.BaseFee, + Gas: uint64(1000000), + Data: logCode, + }) if err != nil { t.Fatalf("failed to create tx: %v", err) } gen.AddTx(tx) - gen.OffsetTime(-9) // higher block difficulty } + gen.OffsetTime(-9) // higher block difficulty }) if _, err := blockchain.InsertChain(forkChain); err != nil { t.Fatalf("failed to insert forked chain: %v", err) } - checkLogEvents(t, newLogCh, rmLogsCh, 1, 1) + checkLogEvents(t, newLogCh, rmLogsCh, 10, 10) // This chain segment is rooted in the original chain, but doesn't contain any logs. // When inserting it, the canonical chain switches away from forkChain and re-emits @@ -1197,7 +1213,7 @@ func TestLogRebirth(t *testing.T) { if _, err := blockchain.InsertChain(newBlocks); err != nil { t.Fatalf("failed to insert forked chain: %v", err) } - checkLogEvents(t, newLogCh, rmLogsCh, 1, 1) + checkLogEvents(t, newLogCh, rmLogsCh, 10, 10) } // This test is a variation of TestLogRebirth. It verifies that log events are emitted @@ -1252,19 +1268,43 @@ func TestSideLogRebirth(t *testing.T) { func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan RemovedLogsEvent, wantNew, wantRemoved int) { t.Helper() - - if len(logsCh) != wantNew { - t.Fatalf("wrong number of log events: got %d, want %d", len(logsCh), wantNew) - } - if len(rmLogsCh) != wantRemoved { - t.Fatalf("wrong number of removed log events: got %d, want %d", len(rmLogsCh), wantRemoved) - } + var ( + countNew int + countRm int + prev int + ) // Drain events. - for i := 0; i < len(logsCh); i++ { - <-logsCh + for len(logsCh) > 0 { + x := <-logsCh + countNew += len(x) + for _, log := range x { + // We expect added logs to be in ascending order: 0:0, 0:1, 1:0 ... + have := 100*int(log.BlockNumber) + int(log.TxIndex) + if have < prev { + t.Fatalf("Expected new logs to arrive in ascending order (%d < %d)", have, prev) + } + prev = have + } } - for i := 0; i < len(rmLogsCh); i++ { - <-rmLogsCh + prev = 0 + for len(rmLogsCh) > 0 { + x := <-rmLogsCh + countRm += len(x.Logs) + for _, log := range x.Logs { + // We expect removed logs to be in ascending order: 0:0, 0:1, 1:0 ... + have := 100*int(log.BlockNumber) + int(log.TxIndex) + if have < prev { + t.Fatalf("Expected removed logs to arrive in ascending order (%d < %d)", have, prev) + } + prev = have + } + } + + if countNew != wantNew { + t.Fatalf("wrong number of log events: got %d, want %d", countNew, wantNew) + } + if countRm != wantRemoved { + t.Fatalf("wrong number of removed log events: got %d, want %d", countRm, wantRemoved) } }