eth, miner: prefer locally generated uncles vs remote ones (#17715)

* core, eth: fix dependency cycle

* eth, miner: perfer to locally generated uncle
This commit is contained in:
gary rong 2018-09-21 05:11:55 +08:00 committed by Péter Szilágyi
parent ba0a8b7887
commit 457e930f27
4 changed files with 53 additions and 27 deletions

@ -177,7 +177,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
return nil, err return nil, err
} }
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, config.MinerRecommit, config.MinerGasFloor, config.MinerGasCeil) eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, config.MinerRecommit, config.MinerGasFloor, config.MinerGasCeil, eth.isLocalBlock)
eth.miner.SetExtra(makeExtraData(config.MinerExtraData)) eth.miner.SetExtra(makeExtraData(config.MinerExtraData))
eth.APIBackend = &EthAPIBackend{eth, nil} eth.APIBackend = &EthAPIBackend{eth, nil}

@ -52,13 +52,13 @@ type Miner struct {
shouldStart int32 // should start indicates whether we should start after sync shouldStart int32 // should start indicates whether we should start after sync
} }
func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration, gasFloor, gasCeil uint64) *Miner { func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(block *types.Block) bool) *Miner {
miner := &Miner{ miner := &Miner{
eth: eth, eth: eth,
mux: mux, mux: mux,
engine: engine, engine: engine,
exitCh: make(chan struct{}), exitCh: make(chan struct{}),
worker: newWorker(config, engine, eth, mux, recommit, gasFloor, gasCeil), worker: newWorker(config, engine, eth, mux, recommit, gasFloor, gasCeil, isLocalBlock),
canStart: 1, canStart: 1,
} }
go miner.update() go miner.update()

@ -149,9 +149,10 @@ type worker struct {
resubmitIntervalCh chan time.Duration resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust resubmitAdjustCh chan *intervalAdjust
current *environment // An environment for current running cycle. current *environment // An environment for current running cycle.
possibleUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. localUncles map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks.
unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations. remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations.
mu sync.RWMutex // The lock used to protect the coinbase and extra fields mu sync.RWMutex // The lock used to protect the coinbase and extra fields
coinbase common.Address coinbase common.Address
@ -168,6 +169,9 @@ type worker struct {
running int32 // The indicator whether the consensus engine is running or not. running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting. newTxs int32 // New arrival transaction count since last sealing work submitting.
// External functions
isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.
// Test hooks // Test hooks
newTaskHook func(*task) // Method to call upon receiving a new sealing task. newTaskHook func(*task) // Method to call upon receiving a new sealing task.
skipSealHook func(*task) bool // Method to decide whether skipping the sealing. skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
@ -175,7 +179,7 @@ type worker struct {
resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
} }
func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64) *worker { func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(*types.Block) bool) *worker {
worker := &worker{ worker := &worker{
config: config, config: config,
engine: engine, engine: engine,
@ -184,7 +188,9 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
chain: eth.BlockChain(), chain: eth.BlockChain(),
gasFloor: gasFloor, gasFloor: gasFloor,
gasCeil: gasCeil, gasCeil: gasCeil,
possibleUncles: make(map[common.Hash]*types.Block), isLocalBlock: isLocalBlock,
localUncles: make(map[common.Hash]*types.Block),
remoteUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
pendingTasks: make(map[common.Hash]*task), pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize), txsCh: make(chan core.NewTxsEvent, txChanSize),
@ -405,11 +411,19 @@ func (w *worker) mainLoop() {
w.commitNewWork(req.interrupt, req.noempty, req.timestamp) w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
case ev := <-w.chainSideCh: case ev := <-w.chainSideCh:
if _, exist := w.possibleUncles[ev.Block.Hash()]; exist { // Short circuit for duplicate side blocks
if _, exist := w.localUncles[ev.Block.Hash()]; exist {
continue continue
} }
// Add side block to possible uncle block set. if _, exist := w.remoteUncles[ev.Block.Hash()]; exist {
w.possibleUncles[ev.Block.Hash()] = ev.Block continue
}
// Add side block to possible uncle block set depending on the author.
if w.isLocalBlock != nil && w.isLocalBlock(ev.Block) {
w.localUncles[ev.Block.Hash()] = ev.Block
} else {
w.remoteUncles[ev.Block.Hash()] = ev.Block
}
// If our mining block contains less than 2 uncle blocks, // If our mining block contains less than 2 uncle blocks,
// add the new uncle block if valid and regenerate a mining block. // add the new uncle block if valid and regenerate a mining block.
if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 { if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
@ -421,7 +435,10 @@ func (w *worker) mainLoop() {
if !ok { if !ok {
return false return false
} }
uncle, exist := w.possibleUncles[hash] uncle, exist := w.localUncles[hash]
if !exist {
uncle, exist = w.remoteUncles[hash]
}
if !exist { if !exist {
return false return false
} }
@ -651,7 +668,10 @@ func (w *worker) updateSnapshot() {
if !ok { if !ok {
return false return false
} }
uncle, exist := w.possibleUncles[hash] uncle, exist := w.localUncles[hash]
if !exist {
uncle, exist = w.remoteUncles[hash]
}
if !exist { if !exist {
return false return false
} }
@ -859,23 +879,29 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
misc.ApplyDAOHardFork(env.state) misc.ApplyDAOHardFork(env.state)
} }
// Accumulate the uncles for the current block // Accumulate the uncles for the current block
for hash, uncle := range w.possibleUncles {
if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() {
delete(w.possibleUncles, hash)
}
}
uncles := make([]*types.Header, 0, 2) uncles := make([]*types.Header, 0, 2)
for hash, uncle := range w.possibleUncles { commitUncles := func(blocks map[common.Hash]*types.Block) {
if len(uncles) == 2 { // Clean up stale uncle blocks first
break for hash, uncle := range blocks {
if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() {
delete(blocks, hash)
}
} }
if err := w.commitUncle(env, uncle.Header()); err != nil { for hash, uncle := range blocks {
log.Trace("Possible uncle rejected", "hash", hash, "reason", err) if len(uncles) == 2 {
} else { break
log.Debug("Committing new uncle to block", "hash", hash) }
uncles = append(uncles, uncle.Header()) if err := w.commitUncle(env, uncle.Header()); err != nil {
log.Trace("Possible uncle rejected", "hash", hash, "reason", err)
} else {
log.Debug("Committing new uncle to block", "hash", hash)
uncles = append(uncles, uncle.Header())
}
} }
} }
// Prefer to locally generated uncle
commitUncles(w.localUncles)
commitUncles(w.remoteUncles)
if !noempty { if !noempty {
// Create an empty block based on temporary copied state for sealing in advance without waiting block // Create an empty block based on temporary copied state for sealing in advance without waiting block

@ -133,7 +133,7 @@ func (b *testWorkerBackend) PostChainEvents(events []interface{}) {
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, blocks int) (*worker, *testWorkerBackend) { func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, blocks int) (*worker, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine, blocks) backend := newTestWorkerBackend(t, chainConfig, engine, blocks)
backend.txPool.AddLocals(pendingTxs) backend.txPool.AddLocals(pendingTxs)
w := newWorker(chainConfig, engine, backend, new(event.TypeMux), time.Second, params.GenesisGasLimit, params.GenesisGasLimit) w := newWorker(chainConfig, engine, backend, new(event.TypeMux), time.Second, params.GenesisGasLimit, params.GenesisGasLimit, nil)
w.setEtherbase(testBankAddress) w.setEtherbase(testBankAddress)
return w, backend return w, backend
} }