miner/worker: broadcast block immediately once sealed (#2576)

This commit is contained in:
buddho 2024-07-16 21:24:37 +08:00 committed by GitHub
parent d35b57ae36
commit 6d5b4ad64d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 20 additions and 12 deletions

@ -1908,26 +1908,30 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// WriteBlockAndSetHead writes the given block and all associated state to the database,
// and applies the block as the new chain head.
func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool, mux *event.TypeMux) (status WriteStatus, err error) {
if !bc.chainmu.TryLock() {
return NonStatTy, errChainStopped
}
defer bc.chainmu.Unlock()
return bc.writeBlockAndSetHead(block, receipts, logs, state, emitHeadEvent)
return bc.writeBlockAndSetHead(block, receipts, logs, state, emitHeadEvent, mux)
}
// writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead.
// This function expects the chain mutex to be held.
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
if err := bc.writeBlockWithState(block, receipts, state); err != nil {
return NonStatTy, err
}
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool, mux *event.TypeMux) (status WriteStatus, err error) {
currentBlock := bc.CurrentBlock()
reorg, err := bc.forker.ReorgNeededWithFastFinality(currentBlock, block.Header())
if err != nil {
return NonStatTy, err
}
if reorg && mux != nil {
mux.Post(NewSealedBlockEvent{Block: block})
}
if err := bc.writeBlockWithState(block, receipts, state); err != nil {
return NonStatTy, err
}
if reorg {
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != currentBlock.Hash() {
@ -2300,7 +2304,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
// Don't set the head, only insert the block
err = bc.writeBlockWithState(block, receipts, statedb)
} else {
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false)
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false, nil)
}
if err != nil {
return it.index, err

@ -27,7 +27,10 @@ type NewTxsEvent struct{ Txs []*types.Transaction }
// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }
// NewMinedBlockEvent is posted when a block has been imported.
// NewSealedBlockEvent is posted when a block has been sealed.
type NewSealedBlockEvent struct{ Block *types.Block }
// NewMinedBlockEvent is posted when a block has been mined.
type NewMinedBlockEvent struct{ Block *types.Block }
// RemovedLogsEvent is posted when a reorg happens

@ -729,7 +729,7 @@ func (h *handler) Start(maxPeers int, maxPeersPerIP int) {
// broadcast mined blocks
h.wg.Add(1)
h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{})
h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}, core.NewSealedBlockEvent{})
go h.minedBroadcastLoop()
// start sync handlers
@ -946,8 +946,9 @@ func (h *handler) minedBroadcastLoop() {
if obj == nil {
continue
}
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
h.BroadcastBlock(ev.Block, true) // First propagate block to peers
if ev, ok := obj.Data.(core.NewSealedBlockEvent); ok {
h.BroadcastBlock(ev.Block, true) // Propagate block to peers
} else if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
h.BroadcastBlock(ev.Block, false) // Only then announce to the rest
}
case <-h.stopCh:

@ -665,7 +665,7 @@ func (w *worker) resultLoop() {
// Commit block and state to database.
task.state.SetExpectedStateRoot(block.Root())
start := time.Now()
status, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, true)
status, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, true, w.mux)
if status != core.CanonStatTy {
if err != nil {
log.Error("Failed writing block to chain", "err", err, "status", status)