From b4dcff5772d48e61cf783f41d2fb8dce8ce60236 Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 10 Nov 2022 11:07:12 +0800 Subject: [PATCH] worker: remove the code of resubmit interval adjust resubmit intervalAdjust is for PoW only, to remove it to make worker simpler. With PoW, there will be a periodic timer to check if it is the time to stop packing transaction and start calculating the desired hash value, since other miner could succeed in hash compute if it spends too much time packing transactions. It will commit the current fruit to calculate root at a reasonable time. And it will schedule a new work to get a big block if new transaction was received. When there are too many transactions in the TxPool, the interval of the resubmit timer would be increased and vice versa. But it is not needed with PoS related consensus, since the block interval is determined in PoS, and there is already a timer to stop too long packing. --- miner/worker.go | 78 ------------------------------------ miner/worker_test.go | 95 -------------------------------------------- 2 files changed, 173 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 32d8dbd72..0838defdb 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -53,9 +53,6 @@ const ( // chainSideChanSize is the size of channel listening to ChainSideEvent. chainSideChanSize = 10 - // resubmitAdjustChanSize is the size of resubmitting interval adjustment channel. - resubmitAdjustChanSize = 10 - // sealingLogAtDepth is the number of confirmations before logging successful mining. sealingLogAtDepth = 11 @@ -63,18 +60,6 @@ const ( // any newly arrived transactions. minRecommitInterval = 1 * time.Second - // maxRecommitInterval is the maximum time interval to recreate the sealing block with - // any newly arrived transactions. - maxRecommitInterval = 15 * time.Second - - // intervalAdjustRatio is the impact a single interval adjustment has on sealing work - // resubmitting interval. - intervalAdjustRatio = 0.1 - - // intervalAdjustBias is applied during the new resubmit interval calculation in favor of - // increasing upper limit or decreasing lower limit so that the limit can be reachable. - intervalAdjustBias = 200 * 1000.0 * 1000.0 - // staleThreshold is the maximum depth of the acceptable stale block. staleThreshold = 11 ) @@ -177,12 +162,6 @@ type getWorkReq struct { result chan *types.Block } -// intervalAdjust represents a resubmitting interval adjustment. -type intervalAdjust struct { - ratio float64 - inc bool -} - // worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type worker struct { @@ -213,7 +192,6 @@ type worker struct { startCh chan struct{} exitCh chan struct{} resubmitIntervalCh chan time.Duration - resubmitAdjustCh chan *intervalAdjust wg sync.WaitGroup @@ -279,7 +257,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus exitCh: make(chan struct{}), startCh: make(chan struct{}, 1), resubmitIntervalCh: make(chan time.Duration), - resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), } // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) @@ -396,28 +373,6 @@ func (w *worker) close() { w.wg.Wait() } -// recalcRecommit recalculates the resubmitting interval upon feedback. -func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration { - var ( - prevF = float64(prev.Nanoseconds()) - next float64 - ) - if inc { - next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias) - max := float64(maxRecommitInterval.Nanoseconds()) - if next > max { - next = max - } - } else { - next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias) - min := float64(minRecommit.Nanoseconds()) - if next < min { - next = min - } - } - return time.Duration(int64(next)) -} - // newWorkLoop is a standalone goroutine to submit new sealing work upon received events. func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() @@ -508,23 +463,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { w.resubmitHook(minRecommit, recommit) } - case adjust := <-w.resubmitAdjustCh: - // Adjust resubmit interval by feedback. - if adjust.inc { - before := recommit - target := float64(recommit.Nanoseconds()) / adjust.ratio - recommit = recalcRecommit(minRecommit, recommit, target, true) - log.Trace("Increase miner recommit interval", "from", before, "to", recommit) - } else { - before := recommit - recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false) - log.Trace("Decrease miner recommit interval", "from", before, "to", recommit) - } - - if w.resubmitHook != nil { - w.resubmitHook(minRecommit, recommit) - } - case <-w.exitCh: return } @@ -901,17 +839,6 @@ LOOP: // For the first two cases, the semi-finished work will be discarded. // For the third case, the semi-finished work will be submitted to the consensus engine. if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { - // Notify resubmit loop to increase resubmitting interval due to too frequent commits. - if atomic.LoadInt32(interrupt) == commitInterruptResubmit { - ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit) - if ratio < 0.1 { - ratio = 0.1 - } - w.resubmitAdjustCh <- &intervalAdjust{ - ratio: ratio, - inc: true, - } - } return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done @@ -998,11 +925,6 @@ LOOP: } w.pendingLogsFeed.Send(cpy) } - // Notify resubmit loop to decrease resubmitting interval if current interval is larger - // than the user-specified one. - if interrupt != nil { - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - } return false } diff --git a/miner/worker_test.go b/miner/worker_test.go index 2cdd4284e..e405788c6 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -20,7 +20,6 @@ import ( "errors" "math/big" "math/rand" - "sync/atomic" "testing" "time" @@ -270,100 +269,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) { } } -func TestAdjustIntervalEthash(t *testing.T) { - testAdjustInterval(t, ethashChainConfig, ethash.NewFaker()) -} - -func TestAdjustIntervalClique(t *testing.T) { - testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) -} - -func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { - defer engine.Close() - - w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) - defer w.close() - - w.skipSealHook = func(task *task) bool { - return true - } - w.fullTaskHook = func() { - time.Sleep(100 * time.Millisecond) - } - var ( - progress = make(chan struct{}, 10) - result = make([]float64, 0, 10) - index = 0 - start uint32 - ) - w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) { - // Short circuit if interval checking hasn't started. - if atomic.LoadUint32(&start) == 0 { - return - } - var wantMinInterval, wantRecommitInterval time.Duration - - switch index { - case 0: - wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second - case 1: - origin := float64(3 * time.Second.Nanoseconds()) - estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias) - wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond - case 2: - estimate := result[index-1] - min := float64(3 * time.Second.Nanoseconds()) - estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias) - wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond - case 3: - wantMinInterval, wantRecommitInterval = time.Second, time.Second - } - - // Check interval - if minInterval != wantMinInterval { - t.Errorf("resubmit min interval mismatch: have %v, want %v ", minInterval, wantMinInterval) - } - if recommitInterval != wantRecommitInterval { - t.Errorf("resubmit interval mismatch: have %v, want %v", recommitInterval, wantRecommitInterval) - } - result = append(result, float64(recommitInterval.Nanoseconds())) - index += 1 - progress <- struct{}{} - } - w.start() - - time.Sleep(time.Second) // Ensure two tasks have been summitted due to start opt - atomic.StoreUint32(&start, 1) - - w.setRecommitInterval(3 * time.Second) - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8} - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.setRecommitInterval(500 * time.Millisecond) - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } -} - func TestGetSealingWorkEthash(t *testing.T) { testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false) }