diff --git a/core/blockchain.go b/core/blockchain.go index 25be8d762a..235c1f3da2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -854,9 +854,22 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R return status, nil } -// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. If an error is returned -// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go). +// InsertChain attempts to insert the given batch of blocks in to the canonical +// chain or, otherwise, create a fork. If an error is returned it will return +// the index number of the failing block as well an error describing what went +// wrong. +// +// After insertion is done, all accumulated events will be fired. func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { + n, events, logs, err := bc.insertChain(chain) + bc.PostChainEvents(events, logs) + return n, err +} + +// insertChain will execute the actual chain insertion and event aggregation. The +// only reason this method exists as a separate one is to make locking cleaner +// with deferred statements. +func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) { // Do a sanity check that the provided chain is actually ordered and linked for i := 1; i < len(chain); i++ { if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { @@ -864,7 +877,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) - return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), + return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) } } @@ -881,6 +894,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { var ( stats = insertStats{startTime: mclock.Now()} events = make([]interface{}, 0, len(chain)) + lastCanon *types.Block coalescedLogs []*types.Log ) // Start the parallel header verifier @@ -904,7 +918,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) - return i, ErrBlacklistedHash + return i, events, coalescedLogs, ErrBlacklistedHash } // Wait for the block's verification to complete bstart := time.Now() @@ -925,7 +939,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // if given. max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) if block.Time().Cmp(max) > 0 { - return i, fmt.Errorf("future block: %v > %v", block.Time(), max) + return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) } bc.futureBlocks.Add(block.Hash(), block) stats.queued++ @@ -939,7 +953,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { } bc.reportBlock(block, nil, err) - return i, err + return i, events, coalescedLogs, err } // Create a new statedb using the parent block and report an // error if it fails. @@ -951,40 +965,35 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { } state, err := state.New(parent.Root(), bc.stateCache) if err != nil { - return i, err + return i, events, coalescedLogs, err } // Process block using the parent state as reference point. receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) - return i, err + return i, events, coalescedLogs, err } // Validate the state using the default validator err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas) if err != nil { bc.reportBlock(block, receipts, err) - return i, err + return i, events, coalescedLogs, err } - // Write the block to the chain and get the status. status, err := bc.WriteBlockAndState(block, receipts, state) if err != nil { - return i, err + return i, events, coalescedLogs, err } switch status { case CanonStatTy: log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) - // coalesce logs for later processing + coalescedLogs = append(coalescedLogs, logs...) blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) - // We need some control over the mining operation. Acquiring locks and waiting - // for the miner to create new block takes too long and in most cases isn't - // even necessary. - if bc.LastBlockHash() == block.Hash() { - events = append(events, ChainHeadEvent{block}) - } + lastCanon = block + case SideStatTy: log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) @@ -996,9 +1005,11 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { stats.usedGas += usedGas.Uint64() stats.report(chain, i) } - go bc.PostChainEvents(events, coalescedLogs) - - return 0, nil + // Append a single chain head event if we've progressed the chain + if lastCanon != nil && bc.LastBlockHash() == lastCanon.Hash() { + events = append(events, ChainHeadEvent{lastCanon}) + } + return 0, events, coalescedLogs, nil } // insertStats tracks and reports on block insertion. diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 470974a0a7..700a30a16f 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -935,7 +935,7 @@ func TestReorgSideEvent(t *testing.T) { } gen.AddTx(tx) }) - chainSideCh := make(chan ChainSideEvent) + chainSideCh := make(chan ChainSideEvent, 64) blockchain.SubscribeChainSideEvent(chainSideCh) if _, err := blockchain.InsertChain(replacementBlocks); err != nil { t.Fatalf("failed to insert chain: %v", err) diff --git a/miner/worker.go b/miner/worker.go index b48db2a30e..bf24970f5c 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -125,8 +125,6 @@ type worker struct { // atomic status counters mining int32 atWork int32 - - fullValidation bool } func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { @@ -146,7 +144,6 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com coinbase: coinbase, agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), - fullValidation: false, } // Subscribe TxPreEvent for tx pool worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) @@ -297,50 +294,38 @@ func (self *worker) wait() { block := result.Block work := result.Work - if self.fullValidation { - if _, err := self.chain.InsertChain(types.Blocks{block}); err != nil { - log.Error("Mined invalid block", "err", err) - continue + // Update the block hash in all logs since it is now available and not when the + // receipt/log of individual transactions were created. + for _, r := range work.receipts { + for _, l := range r.Logs { + l.BlockHash = block.Hash() } - go self.mux.Post(core.NewMinedBlockEvent{Block: block}) - } else { - // Update the block hash in all logs since it is now available and not when the - // receipt/log of individual transactions were created. - for _, r := range work.receipts { - for _, l := range r.Logs { - l.BlockHash = block.Hash() - } - } - for _, log := range work.state.Logs() { - log.BlockHash = block.Hash() - } - stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state) - if err != nil { - log.Error("Failed writing block to chain", "err", err) - continue - } - - // check if canon block and write transactions - if stat == core.CanonStatTy { - // implicit by posting ChainHeadEvent - mustCommitNewWork = false - } - // broadcast before waiting for validation - go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) { - self.mux.Post(core.NewMinedBlockEvent{Block: block}) - var ( - events []interface{} - coalescedLogs []*types.Log - ) - events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) - if stat == core.CanonStatTy { - events = append(events, core.ChainHeadEvent{Block: block}) - coalescedLogs = logs - } - // post blockchain events - self.chain.PostChainEvents(events, coalescedLogs) - }(block, work.state.Logs(), work.receipts) } + for _, log := range work.state.Logs() { + log.BlockHash = block.Hash() + } + stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state) + if err != nil { + log.Error("Failed writing block to chain", "err", err) + continue + } + // check if canon block and write transactions + if stat == core.CanonStatTy { + // implicit by posting ChainHeadEvent + mustCommitNewWork = false + } + // Broadcast the block and announce chain insertion event + self.mux.Post(core.NewMinedBlockEvent{Block: block}) + var ( + events []interface{} + logs = work.state.Logs() + ) + events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) + if stat == core.CanonStatTy { + events = append(events, core.ChainHeadEvent{Block: block}) + } + self.chain.PostChainEvents(events, logs) + // Insert the block into the set of pending ones to wait for confirmations self.unconfirmed.Insert(block.NumberU64(), block.Hash())