worker: remove the code of resubmit interval adjust

resubmit intervalAdjust is for PoW only, to remove it to make worker simpler.

With PoW, there will be a periodic timer to check if it is the time to stop
packing transaction and start calculating the desired hash value, since other miner
could succeed in hash compute if it spends too much time packing transactions.
It will commit the current fruit to calculate root at a reasonable time.
And it will schedule a new work to get a big block if new transaction was received.

When there are too many transactions in the TxPool, the interval of the resubmit timer would be
increased and vice versa.

But it is not needed with PoS related consensus, since the block interval is determined in PoS,
and there is already a timer to stop too long packing.
This commit is contained in:
setunapo 2022-11-10 11:07:12 +08:00 committed by Larry
parent bf1dbd9233
commit b4dcff5772
2 changed files with 0 additions and 173 deletions

@ -53,9 +53,6 @@ const (
// chainSideChanSize is the size of channel listening to ChainSideEvent. // chainSideChanSize is the size of channel listening to ChainSideEvent.
chainSideChanSize = 10 chainSideChanSize = 10
// resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
resubmitAdjustChanSize = 10
// sealingLogAtDepth is the number of confirmations before logging successful mining. // sealingLogAtDepth is the number of confirmations before logging successful mining.
sealingLogAtDepth = 11 sealingLogAtDepth = 11
@ -63,18 +60,6 @@ const (
// any newly arrived transactions. // any newly arrived transactions.
minRecommitInterval = 1 * time.Second minRecommitInterval = 1 * time.Second
// maxRecommitInterval is the maximum time interval to recreate the sealing block with
// any newly arrived transactions.
maxRecommitInterval = 15 * time.Second
// intervalAdjustRatio is the impact a single interval adjustment has on sealing work
// resubmitting interval.
intervalAdjustRatio = 0.1
// intervalAdjustBias is applied during the new resubmit interval calculation in favor of
// increasing upper limit or decreasing lower limit so that the limit can be reachable.
intervalAdjustBias = 200 * 1000.0 * 1000.0
// staleThreshold is the maximum depth of the acceptable stale block. // staleThreshold is the maximum depth of the acceptable stale block.
staleThreshold = 11 staleThreshold = 11
) )
@ -177,12 +162,6 @@ type getWorkReq struct {
result chan *types.Block result chan *types.Block
} }
// intervalAdjust represents a resubmitting interval adjustment.
type intervalAdjust struct {
ratio float64
inc bool
}
// worker is the main object which takes care of submitting new work to consensus engine // worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result. // and gathering the sealing result.
type worker struct { type worker struct {
@ -213,7 +192,6 @@ type worker struct {
startCh chan struct{} startCh chan struct{}
exitCh chan struct{} exitCh chan struct{}
resubmitIntervalCh chan time.Duration resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust
wg sync.WaitGroup wg sync.WaitGroup
@ -279,7 +257,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
exitCh: make(chan struct{}), exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1), startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration), resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
} }
// Subscribe NewTxsEvent for tx pool // Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
@ -396,28 +373,6 @@ func (w *worker) close() {
w.wg.Wait() w.wg.Wait()
} }
// recalcRecommit recalculates the resubmitting interval upon feedback.
func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration {
var (
prevF = float64(prev.Nanoseconds())
next float64
)
if inc {
next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
max := float64(maxRecommitInterval.Nanoseconds())
if next > max {
next = max
}
} else {
next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
min := float64(minRecommit.Nanoseconds())
if next < min {
next = min
}
}
return time.Duration(int64(next))
}
// newWorkLoop is a standalone goroutine to submit new sealing work upon received events. // newWorkLoop is a standalone goroutine to submit new sealing work upon received events.
func (w *worker) newWorkLoop(recommit time.Duration) { func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done() defer w.wg.Done()
@ -508,23 +463,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
w.resubmitHook(minRecommit, recommit) w.resubmitHook(minRecommit, recommit)
} }
case adjust := <-w.resubmitAdjustCh:
// Adjust resubmit interval by feedback.
if adjust.inc {
before := recommit
target := float64(recommit.Nanoseconds()) / adjust.ratio
recommit = recalcRecommit(minRecommit, recommit, target, true)
log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
} else {
before := recommit
recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false)
log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
}
if w.resubmitHook != nil {
w.resubmitHook(minRecommit, recommit)
}
case <-w.exitCh: case <-w.exitCh:
return return
} }
@ -901,17 +839,6 @@ LOOP:
// For the first two cases, the semi-finished work will be discarded. // For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine. // For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead return atomic.LoadInt32(interrupt) == commitInterruptNewHead
} }
// If we don't have enough gas for any further transactions then we're done // If we don't have enough gas for any further transactions then we're done
@ -998,11 +925,6 @@ LOOP:
} }
w.pendingLogsFeed.Send(cpy) w.pendingLogsFeed.Send(cpy)
} }
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return false return false
} }

@ -20,7 +20,6 @@ import (
"errors" "errors"
"math/big" "math/big"
"math/rand" "math/rand"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -270,100 +269,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) {
} }
} }
func TestAdjustIntervalEthash(t *testing.T) {
testAdjustInterval(t, ethashChainConfig, ethash.NewFaker())
}
func TestAdjustIntervalClique(t *testing.T) {
testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()))
}
func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
defer engine.Close()
w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0)
defer w.close()
w.skipSealHook = func(task *task) bool {
return true
}
w.fullTaskHook = func() {
time.Sleep(100 * time.Millisecond)
}
var (
progress = make(chan struct{}, 10)
result = make([]float64, 0, 10)
index = 0
start uint32
)
w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) {
// Short circuit if interval checking hasn't started.
if atomic.LoadUint32(&start) == 0 {
return
}
var wantMinInterval, wantRecommitInterval time.Duration
switch index {
case 0:
wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second
case 1:
origin := float64(3 * time.Second.Nanoseconds())
estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias)
wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond
case 2:
estimate := result[index-1]
min := float64(3 * time.Second.Nanoseconds())
estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias)
wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond
case 3:
wantMinInterval, wantRecommitInterval = time.Second, time.Second
}
// Check interval
if minInterval != wantMinInterval {
t.Errorf("resubmit min interval mismatch: have %v, want %v ", minInterval, wantMinInterval)
}
if recommitInterval != wantRecommitInterval {
t.Errorf("resubmit interval mismatch: have %v, want %v", recommitInterval, wantRecommitInterval)
}
result = append(result, float64(recommitInterval.Nanoseconds()))
index += 1
progress <- struct{}{}
}
w.start()
time.Sleep(time.Second) // Ensure two tasks have been summitted due to start opt
atomic.StoreUint32(&start, 1)
w.setRecommitInterval(3 * time.Second)
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}
w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8}
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}
w.setRecommitInterval(500 * time.Millisecond)
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}
}
func TestGetSealingWorkEthash(t *testing.T) { func TestGetSealingWorkEthash(t *testing.T) {
testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false) testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false)
} }