core, eth, ethstats: simplify chain head events (#30601)

This commit is contained in:
Péter Szilágyi 2024-10-16 10:32:58 +03:00 committed by GitHub
parent 15bf90ebc5
commit 368e16f39d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 48 additions and 173 deletions

@ -224,7 +224,6 @@ type BlockChain struct {
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
@ -571,15 +570,14 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
// Send chain head event to update the transaction pool
header := bc.CurrentBlock()
block := bc.GetBlock(header.Hash(), header.Number.Uint64())
if block == nil {
if block := bc.GetBlock(header.Hash(), header.Number.Uint64()); block == nil {
// This should never happen. In practice, previously currentBlock
// contained the entire block whereas now only a "marker", so there
// is an ever so slight chance for a race we should handle.
log.Error("Current block not found in database", "block", header.Number, "hash", header.Hash())
return fmt.Errorf("current block missing: #%d [%x..]", header.Number, header.Hash().Bytes()[:4])
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: header})
return nil
}
@ -593,15 +591,14 @@ func (bc *BlockChain) SetHeadWithTimestamp(timestamp uint64) error {
}
// Send chain head event to update the transaction pool
header := bc.CurrentBlock()
block := bc.GetBlock(header.Hash(), header.Number.Uint64())
if block == nil {
if block := bc.GetBlock(header.Hash(), header.Number.Uint64()); block == nil {
// This should never happen. In practice, previously currentBlock
// contained the entire block whereas now only a "marker", so there
// is an ever so slight chance for a race we should handle.
log.Error("Current block not found in database", "block", header.Number, "hash", header.Hash())
return fmt.Errorf("current block missing: #%d [%x..]", header.Number, header.Hash().Bytes()[:4])
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: header})
return nil
}
@ -1552,7 +1549,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
// Set new head.
bc.writeHeadBlock(block)
bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
@ -1562,7 +1559,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
// we will fire an accumulated ChainHeadEvent and disable fire
// event here.
if emitHeadEvent {
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: block.Header()})
}
return CanonStatTy, nil
}
@ -1627,7 +1624,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
// Fire a single chain head event if we've progressed the chain
defer func() {
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: lastCanon.Header()})
}
}()
// Start the parallel header verifier
@ -2328,9 +2325,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
// Deleted logs + blocks:
var deletedLogs []*types.Log
for i := len(oldChain) - 1; i >= 0; i-- {
// Also send event for blocks removed from the canon chain.
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
// Collect deleted logs for notification
if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
@ -2403,11 +2397,11 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
// Emit events
logs := bc.collectLogs(head, false)
bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs})
bc.chainFeed.Send(ChainEvent{Header: head.Header()})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: head})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: head.Header()})
context := []interface{}{
"number", head.Number(),

@ -430,11 +430,6 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}
// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
}
// SubscribeLogsEvent registers a subscription of []*types.Log.
func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return bc.scope.Track(bc.logsFeed.Subscribe(ch))

@ -1332,85 +1332,6 @@ func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan Re
}
}
func TestReorgSideEvent(t *testing.T) {
testReorgSideEvent(t, rawdb.HashScheme)
testReorgSideEvent(t, rawdb.PathScheme)
}
func testReorgSideEvent(t *testing.T, scheme string) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
gspec = &Genesis{
Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000000)}},
}
signer = types.LatestSigner(gspec.Config)
)
blockchain, _ := NewBlockChain(rawdb.NewMemoryDatabase(), DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer blockchain.Stop()
_, chain, _ := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 3, func(i int, gen *BlockGen) {})
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}
_, replacementBlocks, _ := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 4, func(i int, gen *BlockGen) {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, nil), signer, key1)
if i == 2 {
gen.OffsetTime(-9)
}
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
})
chainSideCh := make(chan ChainSideEvent, 64)
blockchain.SubscribeChainSideEvent(chainSideCh)
if _, err := blockchain.InsertChain(replacementBlocks); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}
expectedSideHashes := map[common.Hash]bool{
chain[0].Hash(): true,
chain[1].Hash(): true,
chain[2].Hash(): true,
}
i := 0
const timeoutDura = 10 * time.Second
timeout := time.NewTimer(timeoutDura)
done:
for {
select {
case ev := <-chainSideCh:
block := ev.Block
if _, ok := expectedSideHashes[block.Hash()]; !ok {
t.Errorf("%d: didn't expect %x to be in side chain", i, block.Hash())
}
i++
if i == len(expectedSideHashes) {
timeout.Stop()
break done
}
timeout.Reset(timeoutDura)
case <-timeout.C:
t.Fatalf("Timeout. Possibly not all blocks were triggered for sideevent: %v", i)
}
}
// make sure no more events are fired
select {
case e := <-chainSideCh:
t.Errorf("unexpected event fired: %v", e)
case <-time.After(250 * time.Millisecond):
}
}
// Tests if the canonical block can be fetched from the database during chain insertion.
func TestCanonicalBlockRetrieval(t *testing.T) {
testCanonicalBlockRetrieval(t, rawdb.HashScheme)

@ -222,20 +222,19 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainH
errc <- nil
return
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
if ev.Header.ParentHash != prevHash {
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash {
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, ev.Header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
}
}
c.newHead(header.Number.Uint64(), false)
c.newHead(ev.Header.Number.Uint64(), false)
prevHeader, prevHash = header, header.Hash()
prevHeader, prevHash = ev.Header, ev.Header.Hash()
}
}
}

@ -17,27 +17,19 @@
package core
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }
// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }
// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }
type ChainEvent struct {
Block *types.Block
Hash common.Hash
Logs []*types.Log
Header *types.Header
}
type ChainSideEvent struct {
Block *types.Block
type ChainHeadEvent struct {
Header *types.Header
}
type ChainHeadEvent struct{ Block *types.Block }

@ -151,9 +151,9 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
if done == nil {
stop = make(chan struct{})
done = make(chan struct{})
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done)
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Header.Number.Uint64(), stop, done)
}
lastHead = head.Block.NumberU64()
lastHead = head.Header.Number.Uint64()
case <-done:
stop = nil
done = nil

@ -243,7 +243,7 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
select {
case event := <-newHeadCh:
// Chain moved forward, store the head for later consumption
newHead = event.Block.Header()
newHead = event.Header
case head := <-resetDone:
// Previous reset finished, update the old head and allow a new reset

@ -275,10 +275,6 @@ func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) e
return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
}
func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChainSideEvent(ch)
}
func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}

@ -123,16 +123,16 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
timer := time.NewTimer(12 * time.Second)
for {
select {
case evt := <-chainHeadCh:
for _, includedTx := range evt.Block.Transactions() {
case ev := <-chainHeadCh:
block := ethService.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
for _, includedTx := range block.Transactions() {
includedTxs[includedTx.Hash()] = struct{}{}
}
for _, includedWithdrawal := range evt.Block.Withdrawals() {
for _, includedWithdrawal := range block.Withdrawals() {
includedWithdrawals = append(includedWithdrawals, includedWithdrawal.Index)
}
// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
if len(includedTxs) == len(txs) && len(includedWithdrawals) == len(withdrawals) && evt.Block.Number().Cmp(big.NewInt(2)) == 0 {
if len(includedTxs) == len(txs) && len(includedWithdrawals) == len(withdrawals) && ev.Header.Number.Cmp(big.NewInt(2)) == 0 {
return
}
case <-timer.C:
@ -186,11 +186,12 @@ func TestOnDemandSpam(t *testing.T) {
)
for {
select {
case evt := <-chainHeadCh:
for _, itx := range evt.Block.Transactions() {
case ev := <-chainHeadCh:
block := eth.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
for _, itx := range block.Transactions() {
includedTxs[itx.Hash()] = struct{}{}
}
for _, iwx := range evt.Block.Withdrawals() {
for _, iwx := range block.Withdrawals() {
includedWxs = append(includedWxs, iwx.Index)
}
// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10

@ -391,7 +391,7 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent)
func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
f.headers <- ev.Header
}
}

@ -200,7 +200,7 @@ func TestBlockSubscription(t *testing.T) {
)
for _, blk := range chain {
chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk})
chainEvents = append(chainEvents, core.ChainEvent{Header: blk.Header()})
}
chan0 := make(chan *types.Header)
@ -213,13 +213,13 @@ func TestBlockSubscription(t *testing.T) {
for i1 != len(chainEvents) || i2 != len(chainEvents) {
select {
case header := <-chan0:
if chainEvents[i1].Hash != header.Hash() {
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash())
if chainEvents[i1].Header.Hash() != header.Hash() {
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Header.Hash(), header.Hash())
}
i1++
case header := <-chan1:
if chainEvents[i2].Hash != header.Hash() {
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash())
if chainEvents[i2].Header.Hash() != header.Hash() {
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Header.Hash(), header.Hash())
}
i2++
}

@ -124,10 +124,10 @@ func NewOracle(backend OracleBackend, params Config, startPrice *big.Int) *Oracl
go func() {
var lastHead common.Hash
for ev := range headEvent {
if ev.Block.ParentHash() != lastHead {
if ev.Header.ParentHash != lastHead {
cache.Purge()
}
lastHead = ev.Block.Hash()
lastHead = ev.Header.Hash()
}
}()

@ -219,7 +219,7 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
// Start a goroutine that exhausts the subscriptions to avoid events piling up
var (
quitCh = make(chan struct{})
headCh = make(chan *types.Block, 1)
headCh = make(chan *types.Header, 1)
txCh = make(chan struct{}, 1)
)
go func() {
@ -231,7 +231,7 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
// Notify of chain head events, but drop if too frequent
case head := <-chainHeadCh:
select {
case headCh <- head.Block:
case headCh <- head.Header:
default:
}
@ -602,9 +602,9 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
}
// reportBlock retrieves the current chain head and reports it to the stats server.
func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error {
func (s *Service) reportBlock(conn *connWrapper, header *types.Header) error {
// Gather the block details from the header or block chain
details := s.assembleBlockStats(block)
details := s.assembleBlockStats(header)
// Short circuit if the block detail is not available.
if details == nil {
@ -625,10 +625,9 @@ func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error {
// assembleBlockStats retrieves any required metadata to report a single block
// and assembles the block stats. If block is nil, the current head is processed.
func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
func (s *Service) assembleBlockStats(header *types.Header) *blockStats {
// Gather the block infos from the local blockchain
var (
header *types.Header
td *big.Int
txs []txStats
uncles []*types.Header
@ -638,16 +637,13 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
fullBackend, ok := s.backend.(fullNodeBackend)
if ok {
// Retrieve current chain head if no block is given.
if block == nil {
head := fullBackend.CurrentBlock()
block, _ = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(head.Number.Uint64()))
if header == nil {
header = fullBackend.CurrentBlock()
}
// Short circuit if no block is available. It might happen when
// the blockchain is reorging.
block, _ := fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(header.Number.Uint64()))
if block == nil {
return nil
}
header = block.Header()
td = fullBackend.GetTd(context.Background(), header.Hash())
txs = make([]txStats, len(block.Transactions()))
@ -657,15 +653,12 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
uncles = block.Uncles()
} else {
// Light nodes would need on-demand lookups for transactions/uncles, skip
if block != nil {
header = block.Header()
} else {
if header == nil {
header = s.backend.CurrentHeader()
}
td = s.backend.GetTd(context.Background(), header.Hash())
txs = []txStats{}
}
// Assemble and return the block stats
author, _ := s.engine.Author(header)
@ -708,19 +701,10 @@ func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
// Gather the batch of blocks to report
history := make([]*blockStats, len(indexes))
for i, number := range indexes {
fullBackend, ok := s.backend.(fullNodeBackend)
// Retrieve the next block if it's known to us
var block *types.Block
if ok {
block, _ = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(number)) // TODO ignore error here ?
} else {
if header, _ := s.backend.HeaderByNumber(context.Background(), rpc.BlockNumber(number)); header != nil {
block = types.NewBlockWithHeader(header)
}
}
// If we do have the block, add to the history and continue
if block != nil {
history[len(history)-1-i] = s.assembleBlockStats(block)
header, _ := s.backend.HeaderByNumber(context.Background(), rpc.BlockNumber(number))
if header != nil {
history[len(history)-1-i] = s.assembleBlockStats(header)
continue
}
// Ran out of blocks, cut the report short and send

@ -587,9 +587,6 @@ func (b testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscr
func (b testBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
panic("implement me")
}
func (b testBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
panic("implement me")
}
func (b testBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
panic("implement me")
}

@ -71,7 +71,6 @@ type Backend interface {
GetEVM(ctx context.Context, msg *core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config, blockCtx *vm.BlockContext) *vm.EVM
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription
// Transaction pool API
SendTx(ctx context.Context, signedTx *types.Transaction) error

@ -377,9 +377,6 @@ func (b *backendMock) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
func (b *backendMock) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
return nil
}
func (b *backendMock) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
return nil
}
func (b *backendMock) SendTx(ctx context.Context, signedTx *types.Transaction) error { return nil }
func (b *backendMock) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) {
return false, nil, [32]byte{}, 0, 0, nil