parlia: Delay() with DelayLeftOver

Right now, DelayLeftOver is used to reserve time for block finalize, not block
broadcast. And the code does not work as expected.

The general block generation could be described as:
|- fillTransactions -|- finalize a block -|- wait until the period(3s) reached -|- broadcast -|
This commit is contained in:
setunapo 2022-11-15 15:42:29 +08:00 committed by Larry
parent d1ed977d89
commit fe1c8622f6
8 changed files with 33 additions and 18 deletions

@ -548,7 +548,7 @@ var (
} }
MinerDelayLeftoverFlag = cli.DurationFlag{ MinerDelayLeftoverFlag = cli.DurationFlag{
Name: "miner.delayleftover", Name: "miner.delayleftover",
Usage: "Time interval to for broadcast block", Usage: "Time reserved to finalize a block",
Value: ethconfig.Defaults.Miner.DelayLeftOver, Value: ethconfig.Defaults.Miner.DelayLeftOver,
} }
MinerNoVerfiyFlag = cli.BoolFlag{ MinerNoVerfiyFlag = cli.BoolFlag{

@ -264,7 +264,7 @@ func (beacon *Beacon) Prepare(chain consensus.ChainHeaderReader, header *types.H
return nil return nil
} }
func (beacon *Beacon) Delay(_ consensus.ChainReader, _ *types.Header) *time.Duration { func (beacon *Beacon) Delay(_ consensus.ChainReader, _ *types.Header, _ *time.Duration) *time.Duration {
return nil return nil
} }

@ -591,7 +591,7 @@ func (c *Clique) Authorize(signer common.Address, signFn SignerFn) {
c.signFn = signFn c.signFn = signFn
} }
func (c *Clique) Delay(chain consensus.ChainReader, header *types.Header) *time.Duration { func (c *Clique) Delay(chain consensus.ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration {
return nil return nil
} }

@ -126,7 +126,7 @@ type Engine interface {
APIs(chain ChainHeaderReader) []rpc.API APIs(chain ChainHeaderReader) []rpc.API
// Delay returns the max duration the miner can commit txs // Delay returns the max duration the miner can commit txs
Delay(chain ChainReader, header *types.Header) *time.Duration Delay(chain ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration
// Close terminates any background threads maintained by the consensus engine. // Close terminates any background threads maintained by the consensus engine.
Close() error Close() error

@ -610,7 +610,7 @@ func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea
return types.NewBlock(header, txs, uncles, receipts, trie.NewStackTrie(nil)), receipts, nil return types.NewBlock(header, txs, uncles, receipts, trie.NewStackTrie(nil)), receipts, nil
} }
func (ethash *Ethash) Delay(_ consensus.ChainReader, _ *types.Header) *time.Duration { func (ethash *Ethash) Delay(_ consensus.ChainReader, _ *types.Header, _ *time.Duration) *time.Duration {
return nil return nil
} }

@ -793,13 +793,25 @@ func (p *Parlia) Authorize(val common.Address, signFn SignerFn, signTxFn SignerT
p.signTxFn = signTxFn p.signTxFn = signTxFn
} }
func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header) *time.Duration { // Argument leftOver is the time reserved for block finalize(calculate root, distribute income...)
func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration {
number := header.Number.Uint64() number := header.Number.Uint64()
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil) snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
if err != nil { if err != nil {
return nil return nil
} }
delay := p.delayForRamanujanFork(snap, header) delay := p.delayForRamanujanFork(snap, header)
if *leftOver >= time.Duration(p.config.Period)*time.Second {
// ignore invalid leftOver
log.Error("Delay invalid argument", "leftOver", leftOver.String(), "Period", p.config.Period)
} else if *leftOver >= delay {
delay = time.Duration(0)
return &delay
} else {
delay = delay - *leftOver
}
// The blocking time should be no more than half of period // The blocking time should be no more than half of period
half := time.Duration(p.config.Period) * time.Second / 2 half := time.Duration(p.config.Period) * time.Second / 2
if delay > half { if delay > half {

@ -48,7 +48,7 @@ type Config struct {
Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash). Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash).
NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages
ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner
DelayLeftOver time.Duration // Time for broadcast block DelayLeftOver time.Duration // Time reserved to finalize a block(calculate root, distribute income...)
GasFloor uint64 // Target gas floor for mined blocks. GasFloor uint64 // Target gas floor for mined blocks.
GasCeil uint64 // Target gas ceiling for mined blocks. GasCeil uint64 // Target gas ceiling for mined blocks.
GasPrice *big.Int // Minimum gas price for mining a transaction GasPrice *big.Int // Minimum gas price for mining a transaction

@ -560,7 +560,7 @@ func (w *worker) mainLoop() {
} }
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil) w.commitTransactions(w.current, txset, nil, nil)
commitTxsTimer.UpdateSince(start) commitTxsTimer.UpdateSince(start)
// Only update the snapshot if any new transactions were added // Only update the snapshot if any new transactions were added
@ -797,7 +797,8 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece
return receipt.Logs, nil return receipt.Logs, nil
} }
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interruptCh chan int32) bool { func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce,
interruptCh chan int32, stopTimer *time.Timer) bool {
gasLimit := env.header.GasLimit gasLimit := env.header.GasLimit
if env.gasPool == nil { if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit) env.gasPool = new(core.GasPool).AddGas(gasLimit)
@ -809,13 +810,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
} }
var coalescedLogs []*types.Log var coalescedLogs []*types.Log
var stopTimer *time.Timer
delay := w.engine.Delay(w.chain, env.header)
if delay != nil {
stopTimer = time.NewTimer(*delay - w.config.DelayLeftOver)
log.Debug("Time left for mining work", "left", (*delay - w.config.DelayLeftOver).String(), "leftover", w.config.DelayLeftOver)
defer stopTimer.Stop()
}
// initilise bloom processors // initilise bloom processors
processorCapacity := 100 processorCapacity := 100
if txs.CurrentSize() < processorCapacity { if txs.CurrentSize() < processorCapacity {
@ -1048,15 +1042,24 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment) {
localTxs[account] = txs localTxs[account] = txs
} }
} }
var stopTimer *time.Timer
delay := w.engine.Delay(w.chain, env.header, &w.config.DelayLeftOver)
if delay != nil {
stopTimer = time.NewTimer(*delay)
log.Debug("Time left for mining work", "delay", delay.String())
defer stopTimer.Stop()
}
if len(localTxs) > 0 { if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh) { if w.commitTransactions(env, txs, interruptCh, stopTimer) {
return return
} }
} }
if len(remoteTxs) > 0 { if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh) { if w.commitTransactions(env, txs, interruptCh, stopTimer) {
return return
} }
} }