diff --git a/miner/worker.go b/miner/worker.go index 81a63c29a4..fae480c84c 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -213,8 +213,9 @@ type worker struct { running int32 // The indicator whether the consensus engine is running or not. // Test hooks - newTaskHook func(*task) // Method to call upon receiving a new sealing task - fullTaskInterval func() // Method to call before pushing the full sealing task + newTaskHook func(*task) // Method to call upon receiving a new sealing task + skipSealHook func(*task) bool // Method to decide whether skipping the sealing. + fullTaskHook func() // Method to call before pushing the full sealing task } func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker { @@ -329,8 +330,32 @@ func (w *worker) mainLoop() { w.commitNewWork() case ev := <-w.chainSideCh: + if _, exist := w.possibleUncles[ev.Block.Hash()]; exist { + continue + } // Add side block to possible uncle block set. w.possibleUncles[ev.Block.Hash()] = ev.Block + // If our mining block contains less than 2 uncle blocks, + // add the new uncle block if valid and regenerate a mining block. + if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 { + start := time.Now() + if err := w.commitUncle(w.current, ev.Block.Header()); err == nil { + var uncles []*types.Header + w.current.uncles.Each(func(item interface{}) bool { + hash, ok := item.(common.Hash) + if !ok { + return false + } + uncle, exist := w.possibleUncles[hash] + if !exist { + return false + } + uncles = append(uncles, uncle.Header()) + return true + }) + w.commit(uncles, nil, true, start) + } + } case ev := <-w.txsCh: // Apply transactions to the pending state if we're not mining. @@ -378,6 +403,10 @@ func (w *worker) seal(t *task, stop <-chan struct{}) { res *task ) + if w.skipSealHook != nil && w.skipSealHook(t) { + return + } + if t.block, err = w.engine.Seal(w.chain, t.block, stop); t.block != nil { log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(), "elapsed", common.PrettyDuration(time.Since(t.createdAt))) @@ -637,30 +666,9 @@ func (w *worker) commitNewWork() { delete(w.possibleUncles, hash) } - var ( - emptyBlock, fullBlock *types.Block - emptyState, fullState *state.StateDB - ) - // Create an empty block based on temporary copied state for sealing in advance without waiting block // execution finished. - emptyState = env.state.Copy() - if emptyBlock, err = w.engine.Finalize(w.chain, header, emptyState, nil, uncles, nil); err != nil { - log.Error("Failed to finalize block for temporary sealing", "err", err) - } else { - // Push empty work in advance without applying pending transaction. - // The reason is transactions execution can cost a lot and sealer need to - // take advantage of this part time. - if w.isRunning() { - select { - case w.taskCh <- &task{receipts: nil, state: emptyState, block: emptyBlock, createdAt: time.Now()}: - log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles)) - case <-w.exitCh: - log.Info("Worker has exited") - return - } - } - } + w.commit(uncles, nil, false, tstart) // Fill the block with all available pending transactions. pending, err := w.eth.TxPool().Pending() @@ -676,31 +684,38 @@ func (w *worker) commitNewWork() { txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending) env.commitTransactions(w.mux, txs, w.chain, w.coinbase) - // Create the full block to seal with the consensus engine - fullState = env.state.Copy() - if fullBlock, err = w.engine.Finalize(w.chain, header, fullState, env.txs, uncles, env.receipts); err != nil { - log.Error("Failed to finalize block for sealing", "err", err) - return - } - // Deep copy receipts here to avoid interaction between different tasks. - cpy := make([]*types.Receipt, len(env.receipts)) - for i, l := range env.receipts { - cpy[i] = new(types.Receipt) - *cpy[i] = *l - } - // We only care about logging if we're actually mining. - if w.isRunning() { - if w.fullTaskInterval != nil { - w.fullTaskInterval() - } + w.commit(uncles, w.fullTaskHook, true, tstart) +} +// commit runs any post-transaction state modifications, assembles the final block +// and commits new work if consensus engine is running. +func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error { + // Deep copy receipts here to avoid interaction between different tasks. + receipts := make([]*types.Receipt, len(w.current.receipts)) + for i, l := range w.current.receipts { + receipts[i] = new(types.Receipt) + *receipts[i] = *l + } + s := w.current.state.Copy() + block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts) + if err != nil { + return err + } + if w.isRunning() { + if interval != nil { + interval() + } select { - case w.taskCh <- &task{receipts: cpy, state: fullState, block: fullBlock, createdAt: time.Now()}: - w.unconfirmed.Shift(fullBlock.NumberU64() - 1) - log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) + case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}: + w.unconfirmed.Shift(block.NumberU64() - 1) + log.Info("Commit new mining work", "number", block.Number(), "txs", w.current.tcount, "uncles", len(uncles), + "elapsed", common.PrettyDuration(time.Since(start))) case <-w.exitCh: log.Info("Worker has exited") } } - w.updateSnapshot() + if update { + w.updateSnapshot() + } + return nil } diff --git a/miner/worker_test.go b/miner/worker_test.go index 5823a608ef..408c47e3b3 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -59,7 +59,7 @@ func init() { ethashChainConfig = params.TestChainConfig cliqueChainConfig = params.TestChainConfig cliqueChainConfig.Clique = ¶ms.CliqueConfig{ - Period: 1, + Period: 10, Epoch: 30000, } tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey) @@ -74,6 +74,7 @@ type testWorkerBackend struct { txPool *core.TxPool chain *core.BlockChain testTxFeed event.Feed + uncleBlock *types.Block } func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) *testWorkerBackend { @@ -93,15 +94,19 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine default: t.Fatal("unexpect consensus engine type") } - gspec.MustCommit(db) + genesis := gspec.MustCommit(db) chain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}) txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain) + blocks, _ := core.GenerateChain(chainConfig, genesis, engine, db, 1, func(i int, gen *core.BlockGen) { + gen.SetCoinbase(acc1Addr) + }) return &testWorkerBackend{ - db: db, - chain: chain, - txPool: txpool, + db: db, + chain: chain, + txPool: txpool, + uncleBlock: blocks[0], } } @@ -188,7 +193,7 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens taskCh <- struct{}{} } } - w.fullTaskInterval = func() { + w.fullTaskHook = func() { time.Sleep(100 * time.Millisecond) } @@ -202,11 +207,66 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens w.start() for i := 0; i < 2; i += 1 { - to := time.NewTimer(time.Second) select { case <-taskCh: - case <-to.C: + case <-time.NewTimer(time.Second).C: t.Error("new task timeout") } } } + +func TestStreamUncleBlock(t *testing.T) { + ethash := ethash.NewFaker() + defer ethash.Close() + + w, b := newTestWorker(t, ethashChainConfig, ethash) + defer w.close() + + var taskCh = make(chan struct{}) + + taskIndex := 0 + w.newTaskHook = func(task *task) { + if task.block.NumberU64() == 1 { + if taskIndex == 2 { + has := task.block.Header().UncleHash + want := types.CalcUncleHash([]*types.Header{b.uncleBlock.Header()}) + if has != want { + t.Errorf("uncle hash mismatch, has %s, want %s", has.Hex(), want.Hex()) + } + } + taskCh <- struct{}{} + taskIndex += 1 + } + } + w.skipSealHook = func(task *task) bool { + return true + } + w.fullTaskHook = func() { + time.Sleep(100 * time.Millisecond) + } + + // Ensure worker has finished initialization + for { + b := w.pendingBlock() + if b != nil && b.NumberU64() == 1 { + break + } + } + + w.start() + // Ignore the first two works + for i := 0; i < 2; i += 1 { + select { + case <-taskCh: + case <-time.NewTimer(time.Second).C: + t.Error("new task timeout") + } + } + b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.uncleBlock}}) + + select { + case <-taskCh: + case <-time.NewTimer(time.Second).C: + t.Error("new task timeout") + } +}