miner: use atomic type (#27013)

Use the new typed atomics in the miner package
This commit is contained in:
s7v7nislands 2023-03-31 14:32:47 +08:00 committed by GitHub
parent d0fbb10658
commit b92d0ea3bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 26 deletions

@ -164,7 +164,7 @@ const (
// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
type newWorkReq struct { type newWorkReq struct {
interrupt *int32 interrupt *atomic.Int32
noempty bool noempty bool
timestamp int64 timestamp int64
} }
@ -239,15 +239,15 @@ type worker struct {
snapshotState *state.StateDB snapshotState *state.StateDB
// atomic status counters // atomic status counters
running int32 // The indicator whether the consensus engine is running or not. running atomic.Bool // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting. newTxs atomic.Int32 // New arrival transaction count since last sealing work submitting.
// noempty is the flag used to control whether the feature of pre-seal empty // noempty is the flag used to control whether the feature of pre-seal empty
// block is enabled. The default value is false(pre-seal is enabled by default). // block is enabled. The default value is false(pre-seal is enabled by default).
// But in some special scenario the consensus engine will seal blocks instantaneously, // But in some special scenario the consensus engine will seal blocks instantaneously,
// in this case this feature will add all empty blocks into canonical chain // in this case this feature will add all empty blocks into canonical chain
// non-stop and no real transaction will be included. // non-stop and no real transaction will be included.
noempty uint32 noempty atomic.Bool
// newpayloadTimeout is the maximum timeout allowance for creating payload. // newpayloadTimeout is the maximum timeout allowance for creating payload.
// The default value is 2 seconds but node operator can set it to arbitrary // The default value is 2 seconds but node operator can set it to arbitrary
@ -372,12 +372,12 @@ func (w *worker) setRecommitInterval(interval time.Duration) {
// disablePreseal disables pre-sealing feature // disablePreseal disables pre-sealing feature
func (w *worker) disablePreseal() { func (w *worker) disablePreseal() {
atomic.StoreUint32(&w.noempty, 1) w.noempty.Store(true)
} }
// enablePreseal enables pre-sealing feature // enablePreseal enables pre-sealing feature
func (w *worker) enablePreseal() { func (w *worker) enablePreseal() {
atomic.StoreUint32(&w.noempty, 0) w.noempty.Store(false)
} }
// pending returns the pending state and corresponding block. // pending returns the pending state and corresponding block.
@ -409,24 +409,24 @@ func (w *worker) pendingBlockAndReceipts() (*types.Block, types.Receipts) {
// start sets the running status as 1 and triggers new work submitting. // start sets the running status as 1 and triggers new work submitting.
func (w *worker) start() { func (w *worker) start() {
atomic.StoreInt32(&w.running, 1) w.running.Store(true)
w.startCh <- struct{}{} w.startCh <- struct{}{}
} }
// stop sets the running status as 0. // stop sets the running status as 0.
func (w *worker) stop() { func (w *worker) stop() {
atomic.StoreInt32(&w.running, 0) w.running.Store(false)
} }
// isRunning returns an indicator whether worker is running or not. // isRunning returns an indicator whether worker is running or not.
func (w *worker) isRunning() bool { func (w *worker) isRunning() bool {
return atomic.LoadInt32(&w.running) == 1 return w.running.Load()
} }
// close terminates all background threads maintained by the worker. // close terminates all background threads maintained by the worker.
// Note the worker does not support being closed multiple times. // Note the worker does not support being closed multiple times.
func (w *worker) close() { func (w *worker) close() {
atomic.StoreInt32(&w.running, 0) w.running.Store(false)
close(w.exitCh) close(w.exitCh)
w.wg.Wait() w.wg.Wait()
} }
@ -457,7 +457,7 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t
func (w *worker) newWorkLoop(recommit time.Duration) { func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done() defer w.wg.Done()
var ( var (
interrupt *int32 interrupt *atomic.Int32
minRecommit = recommit // minimal resubmit interval specified by user. minRecommit = recommit // minimal resubmit interval specified by user.
timestamp int64 // timestamp for each round of sealing. timestamp int64 // timestamp for each round of sealing.
) )
@ -469,16 +469,16 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
// commit aborts in-flight transaction execution with given signal and resubmits a new one. // commit aborts in-flight transaction execution with given signal and resubmits a new one.
commit := func(noempty bool, s int32) { commit := func(noempty bool, s int32) {
if interrupt != nil { if interrupt != nil {
atomic.StoreInt32(interrupt, s) interrupt.Store(s)
} }
interrupt = new(int32) interrupt = new(atomic.Int32)
select { select {
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
case <-w.exitCh: case <-w.exitCh:
return return
} }
timer.Reset(recommit) timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0) w.newTxs.Store(0)
} }
// clearPending cleans the stale pending tasks. // clearPending cleans the stale pending tasks.
clearPending := func(number uint64) { clearPending := func(number uint64) {
@ -508,7 +508,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
// higher priced transactions. Disable this overhead for pending blocks. // higher priced transactions. Disable this overhead for pending blocks.
if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) { if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) {
// Short circuit if no new transaction arrives. // Short circuit if no new transaction arrives.
if atomic.LoadInt32(&w.newTxs) == 0 { if w.newTxs.Load() == 0 {
timer.Reset(recommit) timer.Reset(recommit)
continue continue
} }
@ -650,7 +650,7 @@ func (w *worker) mainLoop() {
w.commitWork(nil, true, time.Now().Unix()) w.commitWork(nil, true, time.Now().Unix())
} }
} }
atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) w.newTxs.Add(int32(len(ev.Txs)))
// System stopped // System stopped
case <-w.exitCh: case <-w.exitCh:
@ -877,7 +877,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
return receipt.Logs, nil return receipt.Logs, nil
} }
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error { func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *atomic.Int32) error {
gasLimit := env.header.GasLimit gasLimit := env.header.GasLimit
if env.gasPool == nil { if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit) env.gasPool = new(core.GasPool).AddGas(gasLimit)
@ -887,7 +887,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
for { for {
// Check interruption signal and abort building if it's fired. // Check interruption signal and abort building if it's fired.
if interrupt != nil { if interrupt != nil {
if signal := atomic.LoadInt32(interrupt); signal != commitInterruptNone { if signal := interrupt.Load(); signal != commitInterruptNone {
return signalToErr(signal) return signalToErr(signal)
} }
} }
@ -1067,7 +1067,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// fillTransactions retrieves the pending transactions from the txpool and fills them // fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can // into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future. // be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *int32, env *environment) error { func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) error {
// Split the pending transactions into locals and remotes // Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions. // Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(true) pending := w.eth.TxPool().Pending(true)
@ -1102,9 +1102,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, *big.Int, e
defer work.discard() defer work.discard()
if !params.noTxs { if !params.noTxs {
interrupt := new(int32) interrupt := new(atomic.Int32)
timer := time.AfterFunc(w.newpayloadTimeout, func() { timer := time.AfterFunc(w.newpayloadTimeout, func() {
atomic.StoreInt32(interrupt, commitInterruptTimeout) interrupt.Store(commitInterruptTimeout)
}) })
defer timer.Stop() defer timer.Stop()
@ -1122,7 +1122,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, *big.Int, e
// commitWork generates several new sealing tasks based on the parent block // commitWork generates several new sealing tasks based on the parent block
// and submit them to the sealer. // and submit them to the sealer.
func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int64) {
start := time.Now() start := time.Now()
// Set the coinbase if the worker is running or it's required // Set the coinbase if the worker is running or it's required
@ -1143,7 +1143,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
} }
// Create an empty block based on temporary copied state for // Create an empty block based on temporary copied state for
// sealing in advance without waiting block execution finished. // sealing in advance without waiting block execution finished.
if !noempty && atomic.LoadUint32(&w.noempty) == 0 { if !noempty && !w.noempty.Load() {
w.commit(work.copy(), nil, false, start) w.commit(work.copy(), nil, false, start)
} }
// Fill pending transactions from the txpool into the block. // Fill pending transactions from the txpool into the block.

@ -454,11 +454,11 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
progress = make(chan struct{}, 10) progress = make(chan struct{}, 10)
result = make([]float64, 0, 10) result = make([]float64, 0, 10)
index = 0 index = 0
start uint32 start atomic.Bool
) )
w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) { w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) {
// Short circuit if interval checking hasn't started. // Short circuit if interval checking hasn't started.
if atomic.LoadUint32(&start) == 0 { if !start.Load() {
return return
} }
var wantMinInterval, wantRecommitInterval time.Duration var wantMinInterval, wantRecommitInterval time.Duration
@ -493,7 +493,7 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
w.start() w.start()
time.Sleep(time.Second) // Ensure two tasks have been submitted due to start opt time.Sleep(time.Second) // Ensure two tasks have been submitted due to start opt
atomic.StoreUint32(&start, 1) start.Store(true)
w.setRecommitInterval(3 * time.Second) w.setRecommitInterval(3 * time.Second)
select { select {