consensus/ethash: move remote agent logic to ethash internal (#15853)

* consensus/ethash: start remote ggoroutine to handle remote mining

* consensus/ethash: expose remote miner api

* consensus/ethash: expose submitHashrate api

* miner, ethash: push empty block to sealer without waiting execution

* consensus, internal: add getHashrate API for ethash

* consensus: add three method for consensus interface

* miner: expose consensus engine running status to miner

* eth, miner: specify etherbase when miner created

* miner: commit new work when consensus engine is started

* consensus, miner: fix some logics

* all: delete useless interfaces

* consensus: polish a bit
This commit is contained in:
gary rong 2018-08-03 16:33:37 +08:00 committed by Péter Szilágyi
parent 70176cda0e
commit 51db5975cc
16 changed files with 608 additions and 361 deletions

@ -31,7 +31,7 @@ import (
) )
const ( const (
ipcAPIs = "admin:1.0 debug:1.0 eth:1.0 miner:1.0 net:1.0 personal:1.0 rpc:1.0 shh:1.0 txpool:1.0 web3:1.0" ipcAPIs = "admin:1.0 debug:1.0 eth:1.0 ethash:1.0 miner:1.0 net:1.0 personal:1.0 rpc:1.0 shh:1.0 txpool:1.0 web3:1.0"
httpAPIs = "eth:1.0 net:1.0 rpc:1.0 web3:1.0" httpAPIs = "eth:1.0 net:1.0 rpc:1.0 web3:1.0"
) )

@ -672,6 +672,11 @@ func CalcDifficulty(snap *Snapshot, signer common.Address) *big.Int {
return new(big.Int).Set(diffNoTurn) return new(big.Int).Set(diffNoTurn)
} }
// Close implements consensus.Engine. It's a noop for clique as there is are no background threads.
func (c *Clique) Close() error {
return nil
}
// APIs implements consensus.Engine, returning the user facing RPC API to allow // APIs implements consensus.Engine, returning the user facing RPC API to allow
// controlling the signer voting. // controlling the signer voting.
func (c *Clique) APIs(chain consensus.ChainReader) []rpc.API { func (c *Clique) APIs(chain consensus.ChainReader) []rpc.API {

@ -96,6 +96,9 @@ type Engine interface {
// APIs returns the RPC APIs this consensus engine provides. // APIs returns the RPC APIs this consensus engine provides.
APIs(chain ChainReader) []rpc.API APIs(chain ChainReader) []rpc.API
// Close terminates any background threads maintained by the consensus engine.
Close() error
} }
// PoW is a consensus engine based on proof-of-work. // PoW is a consensus engine based on proof-of-work.

@ -730,6 +730,7 @@ func TestConcurrentDiskCacheGeneration(t *testing.T) {
go func(idx int) { go func(idx int) {
defer pend.Done() defer pend.Done()
ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}) ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal})
defer ethash.Close()
if err := ethash.VerifySeal(nil, block.Header()); err != nil { if err := ethash.VerifySeal(nil, block.Header()); err != nil {
t.Errorf("proc %d: block verification failed: %v", idx, err) t.Errorf("proc %d: block verification failed: %v", idx, err)
} }

117
consensus/ethash/api.go Normal file

@ -0,0 +1,117 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package ethash
import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
)
var errEthashStopped = errors.New("ethash stopped")
// API exposes ethash related methods for the RPC interface.
type API struct {
ethash *Ethash // Make sure the mode of ethash is normal.
}
// GetWork returns a work package for external miner.
//
// The work package consists of 3 strings:
// result[0] - 32 bytes hex encoded current block header pow-hash
// result[1] - 32 bytes hex encoded seed hash used for DAG
// result[2] - 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty
func (api *API) GetWork() ([3]string, error) {
if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest {
return [3]string{}, errors.New("not supported")
}
var (
workCh = make(chan [3]string, 1)
errc = make(chan error, 1)
)
select {
case api.ethash.fetchWorkCh <- &sealWork{errc: errc, res: workCh}:
case <-api.ethash.exitCh:
return [3]string{}, errEthashStopped
}
select {
case work := <-workCh:
return work, nil
case err := <-errc:
return [3]string{}, err
}
}
// SubmitWork can be used by external miner to submit their POW solution.
// It returns an indication if the work was accepted.
// Note either an invalid solution, a stale work a non-existent work will return false.
func (api *API) SubmitWork(nonce types.BlockNonce, hash, digest common.Hash) bool {
if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest {
return false
}
var errc = make(chan error, 1)
select {
case api.ethash.submitWorkCh <- &mineResult{
nonce: nonce,
mixDigest: digest,
hash: hash,
errc: errc,
}:
case <-api.ethash.exitCh:
return false
}
err := <-errc
return err == nil
}
// SubmitHashrate can be used for remote miners to submit their hash rate.
// This enables the node to report the combined hash rate of all miners
// which submit work through this node.
//
// It accepts the miner hash rate and an identifier which must be unique
// between nodes.
func (api *API) SubmitHashRate(rate hexutil.Uint64, id common.Hash) bool {
if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest {
return false
}
var done = make(chan struct{}, 1)
select {
case api.ethash.submitRateCh <- &hashrate{done: done, rate: uint64(rate), id: id}:
case <-api.ethash.exitCh:
return false
}
// Block until hash rate submitted successfully.
<-done
return true
}
// GetHashrate returns the current hashrate for local CPU miner and remote miner.
func (api *API) GetHashrate() uint64 {
return uint64(api.ethash.Hashrate())
}

@ -33,7 +33,9 @@ import (
"unsafe" "unsafe"
mmap "github.com/edsrzf/mmap-go" mmap "github.com/edsrzf/mmap-go"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
@ -389,6 +391,30 @@ type Config struct {
PowMode Mode PowMode Mode
} }
// mineResult wraps the pow solution parameters for the specified block.
type mineResult struct {
nonce types.BlockNonce
mixDigest common.Hash
hash common.Hash
errc chan error
}
// hashrate wraps the hash rate submitted by the remote sealer.
type hashrate struct {
id common.Hash
ping time.Time
rate uint64
done chan struct{}
}
// sealWork wraps a seal work package for remote sealer.
type sealWork struct {
errc chan error
res chan [3]string
}
// Ethash is a consensus engine based on proof-of-work implementing the ethash // Ethash is a consensus engine based on proof-of-work implementing the ethash
// algorithm. // algorithm.
type Ethash struct { type Ethash struct {
@ -403,15 +429,25 @@ type Ethash struct {
update chan struct{} // Notification channel to update mining parameters update chan struct{} // Notification channel to update mining parameters
hashrate metrics.Meter // Meter tracking the average hashrate hashrate metrics.Meter // Meter tracking the average hashrate
// Remote sealer related fields
workCh chan *types.Block // Notification channel to push new work to remote sealer
resultCh chan *types.Block // Channel used by mining threads to return result
fetchWorkCh chan *sealWork // Channel used for remote sealer to fetch mining work
submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result
fetchRateCh chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer.
submitRateCh chan *hashrate // Channel used for remote sealer to submit their mining hashrate
// The fields below are hooks for testing // The fields below are hooks for testing
shared *Ethash // Shared PoW verifier to avoid cache regeneration shared *Ethash // Shared PoW verifier to avoid cache regeneration
fakeFail uint64 // Block number which fails PoW check even in fake mode fakeFail uint64 // Block number which fails PoW check even in fake mode
fakeDelay time.Duration // Time delay to sleep for before returning from verify fakeDelay time.Duration // Time delay to sleep for before returning from verify
lock sync.Mutex // Ensures thread safety for the in-memory caches and mining fields lock sync.Mutex // Ensures thread safety for the in-memory caches and mining fields
closeOnce sync.Once // Ensures exit channel will not be closed twice.
exitCh chan chan error // Notification channel to exiting backend threads
} }
// New creates a full sized ethash PoW scheme. // New creates a full sized ethash PoW scheme and starts a background thread for remote mining.
func New(config Config) *Ethash { func New(config Config) *Ethash {
if config.CachesInMem <= 0 { if config.CachesInMem <= 0 {
log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem) log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem)
@ -423,19 +459,43 @@ func New(config Config) *Ethash {
if config.DatasetDir != "" && config.DatasetsOnDisk > 0 { if config.DatasetDir != "" && config.DatasetsOnDisk > 0 {
log.Info("Disk storage enabled for ethash DAGs", "dir", config.DatasetDir, "count", config.DatasetsOnDisk) log.Info("Disk storage enabled for ethash DAGs", "dir", config.DatasetDir, "count", config.DatasetsOnDisk)
} }
return &Ethash{ ethash := &Ethash{
config: config, config: config,
caches: newlru("cache", config.CachesInMem, newCache), caches: newlru("cache", config.CachesInMem, newCache),
datasets: newlru("dataset", config.DatasetsInMem, newDataset), datasets: newlru("dataset", config.DatasetsInMem, newDataset),
update: make(chan struct{}), update: make(chan struct{}),
hashrate: metrics.NewMeter(), hashrate: metrics.NewMeter(),
workCh: make(chan *types.Block),
resultCh: make(chan *types.Block),
fetchWorkCh: make(chan *sealWork),
submitWorkCh: make(chan *mineResult),
fetchRateCh: make(chan chan uint64),
submitRateCh: make(chan *hashrate),
exitCh: make(chan chan error),
} }
go ethash.remote()
return ethash
} }
// NewTester creates a small sized ethash PoW scheme useful only for testing // NewTester creates a small sized ethash PoW scheme useful only for testing
// purposes. // purposes.
func NewTester() *Ethash { func NewTester() *Ethash {
return New(Config{CachesInMem: 1, PowMode: ModeTest}) ethash := &Ethash{
config: Config{PowMode: ModeTest},
caches: newlru("cache", 1, newCache),
datasets: newlru("dataset", 1, newDataset),
update: make(chan struct{}),
hashrate: metrics.NewMeter(),
workCh: make(chan *types.Block),
resultCh: make(chan *types.Block),
fetchWorkCh: make(chan *sealWork),
submitWorkCh: make(chan *mineResult),
fetchRateCh: make(chan chan uint64),
submitRateCh: make(chan *hashrate),
exitCh: make(chan chan error),
}
go ethash.remote()
return ethash
} }
// NewFaker creates a ethash consensus engine with a fake PoW scheme that accepts // NewFaker creates a ethash consensus engine with a fake PoW scheme that accepts
@ -489,6 +549,22 @@ func NewShared() *Ethash {
return &Ethash{shared: sharedEthash} return &Ethash{shared: sharedEthash}
} }
// Close closes the exit channel to notify all backend threads exiting.
func (ethash *Ethash) Close() error {
var err error
ethash.closeOnce.Do(func() {
// Short circuit if the exit channel is not allocated.
if ethash.exitCh == nil {
return
}
errc := make(chan error)
ethash.exitCh <- errc
err = <-errc
close(ethash.exitCh)
})
return err
}
// cache tries to retrieve a verification cache for the specified block number // cache tries to retrieve a verification cache for the specified block number
// by first checking against a list of in-memory caches, then against caches // by first checking against a list of in-memory caches, then against caches
// stored on disk, and finally generating one if none can be found. // stored on disk, and finally generating one if none can be found.
@ -561,14 +637,44 @@ func (ethash *Ethash) SetThreads(threads int) {
// Hashrate implements PoW, returning the measured rate of the search invocations // Hashrate implements PoW, returning the measured rate of the search invocations
// per second over the last minute. // per second over the last minute.
// Note the returned hashrate includes local hashrate, but also includes the total
// hashrate of all remote miner.
func (ethash *Ethash) Hashrate() float64 { func (ethash *Ethash) Hashrate() float64 {
return ethash.hashrate.Rate1() // Short circuit if we are run the ethash in normal/test mode.
if ethash.config.PowMode != ModeNormal && ethash.config.PowMode != ModeTest {
return ethash.hashrate.Rate1()
}
var res = make(chan uint64, 1)
select {
case ethash.fetchRateCh <- res:
case <-ethash.exitCh:
// Return local hashrate only if ethash is stopped.
return ethash.hashrate.Rate1()
}
// Gather total submitted hash rate of remote sealers.
return ethash.hashrate.Rate1() + float64(<-res)
} }
// APIs implements consensus.Engine, returning the user facing RPC APIs. Currently // APIs implements consensus.Engine, returning the user facing RPC APIs.
// that is empty.
func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API { func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API {
return nil // In order to ensure backward compatibility, we exposes ethash RPC APIs
// to both eth and ethash namespaces.
return []rpc.API{
{
Namespace: "eth",
Version: "1.0",
Service: &API{ethash},
Public: true,
},
{
Namespace: "ethash",
Version: "1.0",
Service: &API{ethash},
Public: true,
},
}
} }
// SeedHash is the seed to use for generating a verification cache and the mining // SeedHash is the seed to use for generating a verification cache and the mining

@ -23,7 +23,10 @@ import (
"os" "os"
"sync" "sync"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
@ -32,6 +35,7 @@ func TestTestMode(t *testing.T) {
head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)}
ethash := NewTester() ethash := NewTester()
defer ethash.Close()
block, err := ethash.Seal(nil, types.NewBlockWithHeader(head), nil) block, err := ethash.Seal(nil, types.NewBlockWithHeader(head), nil)
if err != nil { if err != nil {
t.Fatalf("failed to seal block: %v", err) t.Fatalf("failed to seal block: %v", err)
@ -52,6 +56,7 @@ func TestCacheFileEvict(t *testing.T) {
} }
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}) e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest})
defer e.Close()
workers := 8 workers := 8
epochs := 100 epochs := 100
@ -77,3 +82,90 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) {
e.VerifySeal(nil, head) e.VerifySeal(nil, head)
} }
} }
func TestRemoteSealer(t *testing.T) {
ethash := NewTester()
defer ethash.Close()
api := &API{ethash}
if _, err := api.GetWork(); err != errNoMiningWork {
t.Error("expect to return an error indicate there is no mining work")
}
head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)}
block := types.NewBlockWithHeader(head)
// Push new work.
ethash.Seal(nil, block, nil)
var (
work [3]string
err error
)
if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() {
t.Error("expect to return a mining work has same hash")
}
if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res {
t.Error("expect to return false when submit a fake solution")
}
// Push new block with same block number to replace the original one.
head = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)}
block = types.NewBlockWithHeader(head)
ethash.Seal(nil, block, nil)
if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() {
t.Error("expect to return the latest pushed work")
}
// Push block with higher block number.
newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)}
newBlock := types.NewBlockWithHeader(newHead)
ethash.Seal(nil, newBlock, nil)
if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res {
t.Error("expect to return false when submit a stale solution")
}
}
func TestHashRate(t *testing.T) {
var (
ethash = NewTester()
api = &API{ethash}
hashrate = []hexutil.Uint64{100, 200, 300}
expect uint64
ids = []common.Hash{common.HexToHash("a"), common.HexToHash("b"), common.HexToHash("c")}
)
defer ethash.Close()
if tot := ethash.Hashrate(); tot != 0 {
t.Error("expect the result should be zero")
}
for i := 0; i < len(hashrate); i += 1 {
if res := api.SubmitHashRate(hashrate[i], ids[i]); !res {
t.Error("remote miner submit hashrate failed")
}
expect += uint64(hashrate[i])
}
if tot := ethash.Hashrate(); tot != float64(expect) {
t.Error("expect total hashrate should be same")
}
}
func TestClosedRemoteSealer(t *testing.T) {
ethash := NewTester()
// Make sure exit channel has been listened
time.Sleep(1 * time.Second)
ethash.Close()
api := &API{ethash}
if _, err := api.GetWork(); err != errEthashStopped {
t.Error("expect to return an error to indicate ethash is stopped")
}
if res := api.SubmitHashRate(hexutil.Uint64(100), common.HexToHash("a")); res {
t.Error("expect to return false when submit hashrate to a stopped ethash")
}
}

@ -18,11 +18,13 @@ package ethash
import ( import (
crand "crypto/rand" crand "crypto/rand"
"errors"
"math" "math"
"math/big" "math/big"
"math/rand" "math/rand"
"runtime" "runtime"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus"
@ -30,6 +32,11 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
var (
errNoMiningWork = errors.New("no mining work available yet")
errInvalidSealResult = errors.New("invalid or stale proof-of-work solution")
)
// Seal implements consensus.Engine, attempting to find a nonce that satisfies // Seal implements consensus.Engine, attempting to find a nonce that satisfies
// the block's difficulty requirements. // the block's difficulty requirements.
func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) { func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) {
@ -45,7 +52,6 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop
} }
// Create a runner and the multiple search threads it directs // Create a runner and the multiple search threads it directs
abort := make(chan struct{}) abort := make(chan struct{})
found := make(chan *types.Block)
ethash.lock.Lock() ethash.lock.Lock()
threads := ethash.threads threads := ethash.threads
@ -64,12 +70,16 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop
if threads < 0 { if threads < 0 {
threads = 0 // Allows disabling local mining without extra logic around local/remote threads = 0 // Allows disabling local mining without extra logic around local/remote
} }
// Push new work to remote sealer
if ethash.workCh != nil {
ethash.workCh <- block
}
var pend sync.WaitGroup var pend sync.WaitGroup
for i := 0; i < threads; i++ { for i := 0; i < threads; i++ {
pend.Add(1) pend.Add(1)
go func(id int, nonce uint64) { go func(id int, nonce uint64) {
defer pend.Done() defer pend.Done()
ethash.mine(block, id, nonce, abort, found) ethash.mine(block, id, nonce, abort, ethash.resultCh)
}(i, uint64(ethash.rand.Int63())) }(i, uint64(ethash.rand.Int63()))
} }
// Wait until sealing is terminated or a nonce is found // Wait until sealing is terminated or a nonce is found
@ -78,7 +88,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop
case <-stop: case <-stop:
// Outside abort, stop all miner threads // Outside abort, stop all miner threads
close(abort) close(abort)
case result = <-found: case result = <-ethash.resultCh:
// One of the threads found a block, abort all others // One of the threads found a block, abort all others
close(abort) close(abort)
case <-ethash.update: case <-ethash.update:
@ -150,3 +160,136 @@ search:
// during sealing so it's not unmapped while being read. // during sealing so it's not unmapped while being read.
runtime.KeepAlive(dataset) runtime.KeepAlive(dataset)
} }
// remote starts a standalone goroutine to handle remote mining related stuff.
func (ethash *Ethash) remote() {
var (
works = make(map[common.Hash]*types.Block)
rates = make(map[common.Hash]hashrate)
currentWork *types.Block
)
// getWork returns a work package for external miner.
//
// The work package consists of 3 strings:
// result[0], 32 bytes hex encoded current block header pow-hash
// result[1], 32 bytes hex encoded seed hash used for DAG
// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty
getWork := func() ([3]string, error) {
var res [3]string
if currentWork == nil {
return res, errNoMiningWork
}
res[0] = currentWork.HashNoNonce().Hex()
res[1] = common.BytesToHash(SeedHash(currentWork.NumberU64())).Hex()
// Calculate the "target" to be returned to the external sealer.
n := big.NewInt(1)
n.Lsh(n, 255)
n.Div(n, currentWork.Difficulty())
n.Lsh(n, 1)
res[2] = common.BytesToHash(n.Bytes()).Hex()
// Trace the seal work fetched by remote sealer.
works[currentWork.HashNoNonce()] = currentWork
return res, nil
}
// submitWork verifies the submitted pow solution, returning
// whether the solution was accepted or not (not can be both a bad pow as well as
// any other error, like no pending work or stale mining result).
submitWork := func(nonce types.BlockNonce, mixDigest common.Hash, hash common.Hash) bool {
// Make sure the work submitted is present
block := works[hash]
if block == nil {
log.Info("Work submitted but none pending", "hash", hash)
return false
}
// Verify the correctness of submitted result.
header := block.Header()
header.Nonce = nonce
header.MixDigest = mixDigest
if err := ethash.VerifySeal(nil, header); err != nil {
log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err)
return false
}
// Make sure the result channel is created.
if ethash.resultCh == nil {
log.Warn("Ethash result channel is empty, submitted mining result is rejected")
return false
}
// Solutions seems to be valid, return to the miner and notify acceptance.
select {
case ethash.resultCh <- block.WithSeal(header):
delete(works, hash)
return true
default:
log.Info("Work submitted is stale", "hash", hash)
return false
}
}
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case block := <-ethash.workCh:
if currentWork != nil && block.ParentHash() != currentWork.ParentHash() {
// Start new round mining, throw out all previous work.
works = make(map[common.Hash]*types.Block)
}
// Update current work with new received block.
// Note same work can be past twice, happens when changing CPU threads.
currentWork = block
case work := <-ethash.fetchWorkCh:
// Return current mining work to remote miner.
miningWork, err := getWork()
if err != nil {
work.errc <- err
} else {
work.res <- miningWork
}
case result := <-ethash.submitWorkCh:
// Verify submitted PoW solution based on maintained mining blocks.
if submitWork(result.nonce, result.mixDigest, result.hash) {
result.errc <- nil
} else {
result.errc <- errInvalidSealResult
}
case result := <-ethash.submitRateCh:
// Trace remote sealer's hash rate by submitted value.
rates[result.id] = hashrate{rate: result.rate, ping: time.Now()}
close(result.done)
case req := <-ethash.fetchRateCh:
// Gather all hash rate submitted by remote sealer.
var total uint64
for _, rate := range rates {
// this could overflow
total += rate.rate
}
req <- total
case <-ticker.C:
// Clear stale submitted hash rate.
for id, rate := range rates {
if time.Since(rate.ping) > 10*time.Second {
delete(rates, id)
}
}
case errc := <-ethash.exitCh:
// Exit remote loop if ethash is closed and return relevant error.
errc <- nil
log.Trace("Ethash remote sealer is exiting")
return
}
}
}

@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
@ -70,16 +69,12 @@ func (api *PublicEthereumAPI) Hashrate() hexutil.Uint64 {
// PublicMinerAPI provides an API to control the miner. // PublicMinerAPI provides an API to control the miner.
// It offers only methods that operate on data that pose no security risk when it is publicly accessible. // It offers only methods that operate on data that pose no security risk when it is publicly accessible.
type PublicMinerAPI struct { type PublicMinerAPI struct {
e *Ethereum e *Ethereum
agent *miner.RemoteAgent
} }
// NewPublicMinerAPI create a new PublicMinerAPI instance. // NewPublicMinerAPI create a new PublicMinerAPI instance.
func NewPublicMinerAPI(e *Ethereum) *PublicMinerAPI { func NewPublicMinerAPI(e *Ethereum) *PublicMinerAPI {
agent := miner.NewRemoteAgent(e.BlockChain(), e.Engine()) return &PublicMinerAPI{e}
e.Miner().Register(agent)
return &PublicMinerAPI{e, agent}
} }
// Mining returns an indication if this node is currently mining. // Mining returns an indication if this node is currently mining.
@ -87,37 +82,6 @@ func (api *PublicMinerAPI) Mining() bool {
return api.e.IsMining() return api.e.IsMining()
} }
// SubmitWork can be used by external miner to submit their POW solution. It returns an indication if the work was
// accepted. Note, this is not an indication if the provided work was valid!
func (api *PublicMinerAPI) SubmitWork(nonce types.BlockNonce, solution, digest common.Hash) bool {
return api.agent.SubmitWork(nonce, digest, solution)
}
// GetWork returns a work package for external miner. The work package consists of 3 strings
// result[0], 32 bytes hex encoded current block header pow-hash
// result[1], 32 bytes hex encoded seed hash used for DAG
// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty
func (api *PublicMinerAPI) GetWork() ([3]string, error) {
if !api.e.IsMining() {
if err := api.e.StartMining(false); err != nil {
return [3]string{}, err
}
}
work, err := api.agent.GetWork()
if err != nil {
return work, fmt.Errorf("mining not ready: %v", err)
}
return work, nil
}
// SubmitHashrate can be used for remote miners to submit their hash rate. This enables the node to report the combined
// hash rate of all miners which submit work through this node. It accepts the miner hash rate and an identifier which
// must be unique between nodes.
func (api *PublicMinerAPI) SubmitHashrate(hashrate hexutil.Uint64, id common.Hash) bool {
api.agent.SubmitHashrate(id, uint64(hashrate))
return true
}
// PrivateMinerAPI provides private RPC methods to control the miner. // PrivateMinerAPI provides private RPC methods to control the miner.
// These methods can be abused by external users and must be considered insecure for use by untrusted users. // These methods can be abused by external users and must be considered insecure for use by untrusted users.
type PrivateMinerAPI struct { type PrivateMinerAPI struct {
@ -132,7 +96,8 @@ func NewPrivateMinerAPI(e *Ethereum) *PrivateMinerAPI {
// Start the miner with the given number of threads. If threads is nil the number // Start the miner with the given number of threads. If threads is nil the number
// of workers started is equal to the number of logical CPUs that are usable by // of workers started is equal to the number of logical CPUs that are usable by
// this process. If mining is already running, this method adjust the number of // this process. If mining is already running, this method adjust the number of
// threads allowed to use. // threads allowed to use and updates the minimum price required by the transaction
// pool.
func (api *PrivateMinerAPI) Start(threads *int) error { func (api *PrivateMinerAPI) Start(threads *int) error {
// Set the number of threads if the seal engine supports it // Set the number of threads if the seal engine supports it
if threads == nil { if threads == nil {
@ -153,7 +118,6 @@ func (api *PrivateMinerAPI) Start(threads *int) error {
api.e.lock.RLock() api.e.lock.RLock()
price := api.e.gasPrice price := api.e.gasPrice
api.e.lock.RUnlock() api.e.lock.RUnlock()
api.e.txPool.SetGasPrice(price) api.e.txPool.SetGasPrice(price)
return api.e.StartMining(true) return api.e.StartMining(true)
} }
@ -198,7 +162,7 @@ func (api *PrivateMinerAPI) SetEtherbase(etherbase common.Address) bool {
// GetHashrate returns the current hashrate of the miner. // GetHashrate returns the current hashrate of the miner.
func (api *PrivateMinerAPI) GetHashrate() uint64 { func (api *PrivateMinerAPI) GetHashrate() uint64 {
return uint64(api.e.miner.HashRate()) return api.e.miner.HashRate()
} }
// PrivateAdminAPI is the collection of Ethereum full node-related APIs // PrivateAdminAPI is the collection of Ethereum full node-related APIs

@ -166,6 +166,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err return nil, err
} }
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner.SetExtra(makeExtraData(config.ExtraData)) eth.miner.SetExtra(makeExtraData(config.ExtraData))
@ -411,6 +412,7 @@ func (s *Ethereum) Start(srvr *p2p.Server) error {
func (s *Ethereum) Stop() error { func (s *Ethereum) Stop() error {
s.bloomIndexer.Close() s.bloomIndexer.Close()
s.blockchain.Stop() s.blockchain.Stop()
s.engine.Close()
s.protocolManager.Stop() s.protocolManager.Stop()
if s.lesServer != nil { if s.lesServer != nil {
s.lesServer.Stop() s.lesServer.Stop()
@ -421,6 +423,5 @@ func (s *Ethereum) Stop() error {
s.chainDb.Close() s.chainDb.Close()
close(s.shutdownChan) close(s.shutdownChan)
return nil return nil
} }

@ -21,6 +21,7 @@ var Modules = map[string]string{
"admin": Admin_JS, "admin": Admin_JS,
"chequebook": Chequebook_JS, "chequebook": Chequebook_JS,
"clique": Clique_JS, "clique": Clique_JS,
"ethash": Ethash_JS,
"debug": Debug_JS, "debug": Debug_JS,
"eth": Eth_JS, "eth": Eth_JS,
"miner": Miner_JS, "miner": Miner_JS,
@ -109,6 +110,34 @@ web3._extend({
}); });
` `
const Ethash_JS = `
web3._extend({
property: 'ethash',
methods: [
new web3._extend.Method({
name: 'getWork',
call: 'ethash_getWork',
params: 0
}),
new web3._extend.Method({
name: 'getHashrate',
call: 'ethash_getHashrate',
params: 0
}),
new web3._extend.Method({
name: 'submitWork',
call: 'ethash_submitWork',
params: 3,
}),
new web3._extend.Method({
name: 'submitHashRate',
call: 'ethash_submitHashRate',
params: 2,
}),
]
});
`
const Admin_JS = ` const Admin_JS = `
web3._extend({ web3._extend({
property: 'admin', property: 'admin',

@ -248,6 +248,7 @@ func (s *LightEthereum) Stop() error {
s.blockchain.Stop() s.blockchain.Stop()
s.protocolManager.Stop() s.protocolManager.Stop()
s.txPool.Stop() s.txPool.Stop()
s.engine.Close()
s.eventMux.Stop() s.eventMux.Stop()

@ -18,7 +18,6 @@ package miner
import ( import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus"
@ -36,24 +35,31 @@ type CpuAgent struct {
chain consensus.ChainReader chain consensus.ChainReader
engine consensus.Engine engine consensus.Engine
isMining int32 // isMining indicates whether the agent is currently mining started int32 // started indicates whether the agent is currently started
} }
func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent { func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent {
miner := &CpuAgent{ agent := &CpuAgent{
chain: chain, chain: chain,
engine: engine, engine: engine,
stop: make(chan struct{}, 1), stop: make(chan struct{}, 1),
workCh: make(chan *Work, 1), workCh: make(chan *Work, 1),
} }
return miner return agent
} }
func (self *CpuAgent) Work() chan<- *Work { return self.workCh } func (self *CpuAgent) Work() chan<- *Work { return self.workCh }
func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch } func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch }
func (self *CpuAgent) Start() {
if !atomic.CompareAndSwapInt32(&self.started, 0, 1) {
return // agent already started
}
go self.update()
}
func (self *CpuAgent) Stop() { func (self *CpuAgent) Stop() {
if !atomic.CompareAndSwapInt32(&self.isMining, 1, 0) { if !atomic.CompareAndSwapInt32(&self.started, 1, 0) {
return // agent already stopped return // agent already stopped
} }
self.stop <- struct{}{} self.stop <- struct{}{}
@ -68,13 +74,6 @@ done:
} }
} }
func (self *CpuAgent) Start() {
if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) {
return // agent already started
}
go self.update()
}
func (self *CpuAgent) update() { func (self *CpuAgent) update() {
out: out:
for { for {
@ -110,10 +109,3 @@ func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
self.returnCh <- nil self.returnCh <- nil
} }
} }
func (self *CpuAgent) GetHashRate() int64 {
if pow, ok := self.engine.(consensus.PoW); ok {
return int64(pow.Hashrate())
}
return 0
}

@ -44,12 +44,9 @@ type Backend interface {
// Miner creates blocks and searches for proof-of-work values. // Miner creates blocks and searches for proof-of-work values.
type Miner struct { type Miner struct {
mux *event.TypeMux mux *event.TypeMux
worker *worker
worker *worker
coinbase common.Address coinbase common.Address
mining int32
eth Backend eth Backend
engine consensus.Engine engine consensus.Engine
@ -62,7 +59,7 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con
eth: eth, eth: eth,
mux: mux, mux: mux,
engine: engine, engine: engine,
worker: newWorker(config, engine, common.Address{}, eth, mux), worker: newWorker(config, engine, eth, mux),
canStart: 1, canStart: 1,
} }
miner.Register(NewCpuAgent(eth.BlockChain(), engine)) miner.Register(NewCpuAgent(eth.BlockChain(), engine))
@ -111,23 +108,16 @@ func (self *Miner) Start(coinbase common.Address) {
log.Info("Network syncing, will start miner afterwards") log.Info("Network syncing, will start miner afterwards")
return return
} }
atomic.StoreInt32(&self.mining, 1)
log.Info("Starting mining operation")
self.worker.start() self.worker.start()
self.worker.commitNewWork() self.worker.commitNewWork()
} }
func (self *Miner) Stop() { func (self *Miner) Stop() {
self.worker.stop() self.worker.stop()
atomic.StoreInt32(&self.mining, 0)
atomic.StoreInt32(&self.shouldStart, 0) atomic.StoreInt32(&self.shouldStart, 0)
} }
func (self *Miner) Register(agent Agent) { func (self *Miner) Register(agent Agent) {
if self.Mining() {
agent.Start()
}
self.worker.register(agent) self.worker.register(agent)
} }
@ -136,22 +126,14 @@ func (self *Miner) Unregister(agent Agent) {
} }
func (self *Miner) Mining() bool { func (self *Miner) Mining() bool {
return atomic.LoadInt32(&self.mining) > 0 return self.worker.isRunning()
} }
func (self *Miner) HashRate() (tot int64) { func (self *Miner) HashRate() uint64 {
if pow, ok := self.engine.(consensus.PoW); ok { if pow, ok := self.engine.(consensus.PoW); ok {
tot += int64(pow.Hashrate()) return uint64(pow.Hashrate())
} }
// do we care this might race? is it worth we're rewriting some return 0
// aspects of the worker/locking up agents so we can get an accurate
// hashrate?
for agent := range self.worker.agents {
if _, ok := agent.(*CpuAgent); !ok {
tot += agent.GetHashRate()
}
}
return
} }
func (self *Miner) SetExtra(extra []byte) error { func (self *Miner) SetExtra(extra []byte) error {

@ -1,202 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package miner
import (
"errors"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type hashrate struct {
ping time.Time
rate uint64
}
type RemoteAgent struct {
mu sync.Mutex
quitCh chan struct{}
workCh chan *Work
returnCh chan<- *Result
chain consensus.ChainReader
engine consensus.Engine
currentWork *Work
work map[common.Hash]*Work
hashrateMu sync.RWMutex
hashrate map[common.Hash]hashrate
running int32 // running indicates whether the agent is active. Call atomically
}
func NewRemoteAgent(chain consensus.ChainReader, engine consensus.Engine) *RemoteAgent {
return &RemoteAgent{
chain: chain,
engine: engine,
work: make(map[common.Hash]*Work),
hashrate: make(map[common.Hash]hashrate),
}
}
func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) {
a.hashrateMu.Lock()
defer a.hashrateMu.Unlock()
a.hashrate[id] = hashrate{time.Now(), rate}
}
func (a *RemoteAgent) Work() chan<- *Work {
return a.workCh
}
func (a *RemoteAgent) SetReturnCh(returnCh chan<- *Result) {
a.returnCh = returnCh
}
func (a *RemoteAgent) Start() {
if !atomic.CompareAndSwapInt32(&a.running, 0, 1) {
return
}
a.quitCh = make(chan struct{})
a.workCh = make(chan *Work, 1)
go a.loop(a.workCh, a.quitCh)
}
func (a *RemoteAgent) Stop() {
if !atomic.CompareAndSwapInt32(&a.running, 1, 0) {
return
}
close(a.quitCh)
close(a.workCh)
}
// GetHashRate returns the accumulated hashrate of all identifier combined
func (a *RemoteAgent) GetHashRate() (tot int64) {
a.hashrateMu.RLock()
defer a.hashrateMu.RUnlock()
// this could overflow
for _, hashrate := range a.hashrate {
tot += int64(hashrate.rate)
}
return
}
func (a *RemoteAgent) GetWork() ([3]string, error) {
a.mu.Lock()
defer a.mu.Unlock()
var res [3]string
if a.currentWork != nil {
block := a.currentWork.Block
res[0] = block.HashNoNonce().Hex()
seedHash := ethash.SeedHash(block.NumberU64())
res[1] = common.BytesToHash(seedHash).Hex()
// Calculate the "target" to be returned to the external miner
n := big.NewInt(1)
n.Lsh(n, 255)
n.Div(n, block.Difficulty())
n.Lsh(n, 1)
res[2] = common.BytesToHash(n.Bytes()).Hex()
a.work[block.HashNoNonce()] = a.currentWork
return res, nil
}
return res, errors.New("No work available yet, don't panic.")
}
// SubmitWork tries to inject a pow solution into the remote agent, returning
// whether the solution was accepted or not (not can be both a bad pow as well as
// any other error, like no work pending).
func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool {
a.mu.Lock()
defer a.mu.Unlock()
// Make sure the work submitted is present
work := a.work[hash]
if work == nil {
log.Info("Work submitted but none pending", "hash", hash)
return false
}
// Make sure the Engine solutions is indeed valid
result := work.Block.Header()
result.Nonce = nonce
result.MixDigest = mixDigest
if err := a.engine.VerifySeal(a.chain, result); err != nil {
log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err)
return false
}
block := work.Block.WithSeal(result)
// Solutions seems to be valid, return to the miner and notify acceptance
a.returnCh <- &Result{work, block}
delete(a.work, hash)
return true
}
// loop monitors mining events on the work and quit channels, updating the internal
// state of the remote miner until a termination is requested.
//
// Note, the reason the work and quit channels are passed as parameters is because
// RemoteAgent.Start() constantly recreates these channels, so the loop code cannot
// assume data stability in these member fields.
func (a *RemoteAgent) loop(workCh chan *Work, quitCh chan struct{}) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-quitCh:
return
case work := <-workCh:
a.mu.Lock()
a.currentWork = work
a.mu.Unlock()
case <-ticker.C:
// cleanup
a.mu.Lock()
for hash, work := range a.work {
if time.Since(work.createdAt) > 7*(12*time.Second) {
delete(a.work, hash)
}
}
a.mu.Unlock()
a.hashrateMu.Lock()
for id, hashrate := range a.hashrate {
if time.Since(hashrate.ping) > 10*time.Second {
delete(a.hashrate, id)
}
}
a.hashrateMu.Unlock()
}
}
}

@ -55,9 +55,8 @@ const (
type Agent interface { type Agent interface {
Work() chan<- *Work Work() chan<- *Work
SetReturnCh(chan<- *Result) SetReturnCh(chan<- *Result)
Stop()
Start() Start()
GetHashRate() int64 Stop()
} }
// Work is the workers current environment and holds // Work is the workers current environment and holds
@ -102,7 +101,6 @@ type worker struct {
chainHeadSub event.Subscription chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription chainSideSub event.Subscription
wg sync.WaitGroup
agents map[Agent]struct{} agents map[Agent]struct{}
recv chan *Result recv chan *Result
@ -128,11 +126,11 @@ type worker struct {
unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations
// atomic status counters // atomic status counters
mining int32 atWork int32 // The number of in-flight consensus engine work.
atWork int32 running int32 // The indicator whether the consensus engine is running or not.
} }
func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker {
worker := &worker{ worker := &worker{
config: config, config: config,
engine: engine, engine: engine,
@ -146,7 +144,6 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
chain: eth.BlockChain(), chain: eth.BlockChain(),
proc: eth.BlockChain().Validator(), proc: eth.BlockChain().Validator(),
possibleUncles: make(map[common.Hash]*types.Block), possibleUncles: make(map[common.Hash]*types.Block),
coinbase: coinbase,
agents: make(map[Agent]struct{}), agents: make(map[Agent]struct{}),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
} }
@ -176,62 +173,51 @@ func (self *worker) setExtra(extra []byte) {
} }
func (self *worker) pending() (*types.Block, *state.StateDB) { func (self *worker) pending() (*types.Block, *state.StateDB) {
if atomic.LoadInt32(&self.mining) == 0 { // return a snapshot to avoid contention on currentMu mutex
// return a snapshot to avoid contention on currentMu mutex self.snapshotMu.RLock()
self.snapshotMu.RLock() defer self.snapshotMu.RUnlock()
defer self.snapshotMu.RUnlock() return self.snapshotBlock, self.snapshotState.Copy()
return self.snapshotBlock, self.snapshotState.Copy()
}
self.currentMu.Lock()
defer self.currentMu.Unlock()
return self.current.Block, self.current.state.Copy()
} }
func (self *worker) pendingBlock() *types.Block { func (self *worker) pendingBlock() *types.Block {
if atomic.LoadInt32(&self.mining) == 0 { // return a snapshot to avoid contention on currentMu mutex
// return a snapshot to avoid contention on currentMu mutex self.snapshotMu.RLock()
self.snapshotMu.RLock() defer self.snapshotMu.RUnlock()
defer self.snapshotMu.RUnlock() return self.snapshotBlock
return self.snapshotBlock
}
self.currentMu.Lock()
defer self.currentMu.Unlock()
return self.current.Block
} }
func (self *worker) start() { func (self *worker) start() {
self.mu.Lock() self.mu.Lock()
defer self.mu.Unlock() defer self.mu.Unlock()
atomic.StoreInt32(&self.running, 1)
atomic.StoreInt32(&self.mining, 1)
// spin up agents
for agent := range self.agents { for agent := range self.agents {
agent.Start() agent.Start()
} }
} }
func (self *worker) stop() { func (self *worker) stop() {
self.wg.Wait()
self.mu.Lock() self.mu.Lock()
defer self.mu.Unlock() defer self.mu.Unlock()
if atomic.LoadInt32(&self.mining) == 1 {
for agent := range self.agents { atomic.StoreInt32(&self.running, 0)
agent.Stop() for agent := range self.agents {
} agent.Stop()
} }
atomic.StoreInt32(&self.mining, 0)
atomic.StoreInt32(&self.atWork, 0) atomic.StoreInt32(&self.atWork, 0)
} }
func (self *worker) isRunning() bool {
return atomic.LoadInt32(&self.running) == 1
}
func (self *worker) register(agent Agent) { func (self *worker) register(agent Agent) {
self.mu.Lock() self.mu.Lock()
defer self.mu.Unlock() defer self.mu.Unlock()
self.agents[agent] = struct{}{} self.agents[agent] = struct{}{}
agent.SetReturnCh(self.recv) agent.SetReturnCh(self.recv)
if self.isRunning() {
agent.Start()
}
} }
func (self *worker) unregister(agent Agent) { func (self *worker) unregister(agent Agent) {
@ -266,7 +252,7 @@ func (self *worker) update() {
// Note all transactions received may not be continuous with transactions // Note all transactions received may not be continuous with transactions
// already included in the current mining block. These transactions will // already included in the current mining block. These transactions will
// be automatically eliminated. // be automatically eliminated.
if atomic.LoadInt32(&self.mining) == 0 { if !self.isRunning() {
self.currentMu.Lock() self.currentMu.Lock()
txs := make(map[common.Address]types.Transactions) txs := make(map[common.Address]types.Transactions)
for _, tx := range ev.Txs { for _, tx := range ev.Txs {
@ -343,9 +329,6 @@ func (self *worker) wait() {
// push sends a new work task to currently live miner agents. // push sends a new work task to currently live miner agents.
func (self *worker) push(work *Work) { func (self *worker) push(work *Work) {
if atomic.LoadInt32(&self.mining) != 1 {
return
}
for agent := range self.agents { for agent := range self.agents {
atomic.AddInt32(&self.atWork, 1) atomic.AddInt32(&self.atWork, 1)
if ch := agent.Work(); ch != nil { if ch := agent.Work(); ch != nil {
@ -416,8 +399,12 @@ func (self *worker) commitNewWork() {
Extra: self.extra, Extra: self.extra,
Time: big.NewInt(tstamp), Time: big.NewInt(tstamp),
} }
// Only set the coinbase if we are mining (avoid spurious block rewards) // Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
if atomic.LoadInt32(&self.mining) == 1 { if self.isRunning() {
if self.coinbase == (common.Address{}) {
log.Error("Refusing to mine without etherbase")
return
}
header.Coinbase = self.coinbase header.Coinbase = self.coinbase
} }
if err := self.engine.Prepare(self.chain, header); err != nil { if err := self.engine.Prepare(self.chain, header); err != nil {
@ -448,13 +435,6 @@ func (self *worker) commitNewWork() {
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
misc.ApplyDAOHardFork(work.state) misc.ApplyDAOHardFork(work.state)
} }
pending, err := self.eth.TxPool().Pending()
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
// compute uncles for the new block. // compute uncles for the new block.
var ( var (
@ -478,17 +458,41 @@ func (self *worker) commitNewWork() {
for _, hash := range badUncles { for _, hash := range badUncles {
delete(self.possibleUncles, hash) delete(self.possibleUncles, hash)
} }
// Create the new block to seal with the consensus engine
// Create an empty block based on temporary copied state for sealing in advance without waiting block
// execution finished.
if work.Block, err = self.engine.Finalize(self.chain, header, work.state.Copy(), nil, uncles, nil); err != nil {
log.Error("Failed to finalize block for temporary sealing", "err", err)
} else {
// Push empty work in advance without applying pending transaction.
// The reason is transactions execution can cost a lot and sealer need to
// take advantage of this part time.
if self.isRunning() {
log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles))
self.push(work)
}
}
// Fill the block with all available pending transactions.
pending, err := self.eth.TxPool().Pending()
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
// Create the full block to seal with the consensus engine
if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil { if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil {
log.Error("Failed to finalize block for sealing", "err", err) log.Error("Failed to finalize block for sealing", "err", err)
return return
} }
// We only care about logging if we're actually mining. // We only care about logging if we're actually mining.
if atomic.LoadInt32(&self.mining) == 1 { if self.isRunning() {
log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
self.unconfirmed.Shift(work.Block.NumberU64() - 1) self.unconfirmed.Shift(work.Block.NumberU64() - 1)
self.push(work)
} }
self.push(work)
self.updateSnapshot() self.updateSnapshot()
} }
@ -511,10 +515,19 @@ func (self *worker) updateSnapshot() {
self.snapshotMu.Lock() self.snapshotMu.Lock()
defer self.snapshotMu.Unlock() defer self.snapshotMu.Unlock()
var uncles []*types.Header
self.current.uncles.Each(func(item interface{}) bool {
if header, ok := item.(*types.Header); ok {
uncles = append(uncles, header)
return true
}
return false
})
self.snapshotBlock = types.NewBlock( self.snapshotBlock = types.NewBlock(
self.current.header, self.current.header,
self.current.txs, self.current.txs,
nil, uncles,
self.current.receipts, self.current.receipts,
) )
self.snapshotState = self.current.state.Copy() self.snapshotState = self.current.state.Copy()