diff --git a/miner/worker.go b/miner/worker.go index 67a5842d23..9fbece8658 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -164,7 +164,7 @@ const ( // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. type newWorkReq struct { - interrupt *int32 + interrupt *atomic.Int32 noempty bool timestamp int64 } @@ -239,15 +239,15 @@ type worker struct { snapshotState *state.StateDB // atomic status counters - running int32 // The indicator whether the consensus engine is running or not. - newTxs int32 // New arrival transaction count since last sealing work submitting. + running atomic.Bool // The indicator whether the consensus engine is running or not. + newTxs atomic.Int32 // New arrival transaction count since last sealing work submitting. // noempty is the flag used to control whether the feature of pre-seal empty // block is enabled. The default value is false(pre-seal is enabled by default). // But in some special scenario the consensus engine will seal blocks instantaneously, // in this case this feature will add all empty blocks into canonical chain // non-stop and no real transaction will be included. - noempty uint32 + noempty atomic.Bool // newpayloadTimeout is the maximum timeout allowance for creating payload. // The default value is 2 seconds but node operator can set it to arbitrary @@ -372,12 +372,12 @@ func (w *worker) setRecommitInterval(interval time.Duration) { // disablePreseal disables pre-sealing feature func (w *worker) disablePreseal() { - atomic.StoreUint32(&w.noempty, 1) + w.noempty.Store(true) } // enablePreseal enables pre-sealing feature func (w *worker) enablePreseal() { - atomic.StoreUint32(&w.noempty, 0) + w.noempty.Store(false) } // pending returns the pending state and corresponding block. @@ -409,24 +409,24 @@ func (w *worker) pendingBlockAndReceipts() (*types.Block, types.Receipts) { // start sets the running status as 1 and triggers new work submitting. func (w *worker) start() { - atomic.StoreInt32(&w.running, 1) + w.running.Store(true) w.startCh <- struct{}{} } // stop sets the running status as 0. func (w *worker) stop() { - atomic.StoreInt32(&w.running, 0) + w.running.Store(false) } // isRunning returns an indicator whether worker is running or not. func (w *worker) isRunning() bool { - return atomic.LoadInt32(&w.running) == 1 + return w.running.Load() } // close terminates all background threads maintained by the worker. // Note the worker does not support being closed multiple times. func (w *worker) close() { - atomic.StoreInt32(&w.running, 0) + w.running.Store(false) close(w.exitCh) w.wg.Wait() } @@ -457,7 +457,7 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() var ( - interrupt *int32 + interrupt *atomic.Int32 minRecommit = recommit // minimal resubmit interval specified by user. timestamp int64 // timestamp for each round of sealing. ) @@ -469,16 +469,16 @@ func (w *worker) newWorkLoop(recommit time.Duration) { // commit aborts in-flight transaction execution with given signal and resubmits a new one. commit := func(noempty bool, s int32) { if interrupt != nil { - atomic.StoreInt32(interrupt, s) + interrupt.Store(s) } - interrupt = new(int32) + interrupt = new(atomic.Int32) select { case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: case <-w.exitCh: return } timer.Reset(recommit) - atomic.StoreInt32(&w.newTxs, 0) + w.newTxs.Store(0) } // clearPending cleans the stale pending tasks. clearPending := func(number uint64) { @@ -508,7 +508,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { // higher priced transactions. Disable this overhead for pending blocks. if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) { // Short circuit if no new transaction arrives. - if atomic.LoadInt32(&w.newTxs) == 0 { + if w.newTxs.Load() == 0 { timer.Reset(recommit) continue } @@ -650,7 +650,7 @@ func (w *worker) mainLoop() { w.commitWork(nil, true, time.Now().Unix()) } } - atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) + w.newTxs.Add(int32(len(ev.Txs))) // System stopped case <-w.exitCh: @@ -877,7 +877,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]* return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error { +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *atomic.Int32) error { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -887,7 +887,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP for { // Check interruption signal and abort building if it's fired. if interrupt != nil { - if signal := atomic.LoadInt32(interrupt); signal != commitInterruptNone { + if signal := interrupt.Load(); signal != commitInterruptNone { return signalToErr(signal) } } @@ -1067,7 +1067,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *int32, env *environment) error { +func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) error { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(true) @@ -1102,9 +1102,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, *big.Int, e defer work.discard() if !params.noTxs { - interrupt := new(int32) + interrupt := new(atomic.Int32) timer := time.AfterFunc(w.newpayloadTimeout, func() { - atomic.StoreInt32(interrupt, commitInterruptTimeout) + interrupt.Store(commitInterruptTimeout) }) defer timer.Stop() @@ -1122,7 +1122,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, *big.Int, e // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. -func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { +func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int64) { start := time.Now() // Set the coinbase if the worker is running or it's required @@ -1143,7 +1143,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { } // Create an empty block based on temporary copied state for // sealing in advance without waiting block execution finished. - if !noempty && atomic.LoadUint32(&w.noempty) == 0 { + if !noempty && !w.noempty.Load() { w.commit(work.copy(), nil, false, start) } // Fill pending transactions from the txpool into the block. diff --git a/miner/worker_test.go b/miner/worker_test.go index e60de67932..9db64a240d 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -454,11 +454,11 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co progress = make(chan struct{}, 10) result = make([]float64, 0, 10) index = 0 - start uint32 + start atomic.Bool ) w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) { // Short circuit if interval checking hasn't started. - if atomic.LoadUint32(&start) == 0 { + if !start.Load() { return } var wantMinInterval, wantRecommitInterval time.Duration @@ -493,7 +493,7 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co w.start() time.Sleep(time.Second) // Ensure two tasks have been submitted due to start opt - atomic.StoreUint32(&start, 1) + start.Store(true) w.setRecommitInterval(3 * time.Second) select {