cmd/geth, miner, backend, xeth: Fixed miner threads to be settable

Miner threads are now settable through the admin interface (closes #897)
and specify 0 CPU worker threads when eth_getWork is called (closes #916)
This commit is contained in:
obscuren 2015-05-11 15:43:14 +02:00
parent 064cf16099
commit 21e52efdfe
9 changed files with 46 additions and 43 deletions

@ -275,14 +275,13 @@ func (js *jsre) verbosity(call otto.FunctionCall) otto.Value {
} }
func (js *jsre) startMining(call otto.FunctionCall) otto.Value { func (js *jsre) startMining(call otto.FunctionCall) otto.Value {
_, err := call.Argument(0).ToInteger() threads, err := call.Argument(0).ToInteger()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return otto.FalseValue() return otto.FalseValue()
} }
// threads now ignored
err = js.ethereum.StartMining() err = js.ethereum.StartMining(int(threads))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return otto.FalseValue() return otto.FalseValue()

@ -401,7 +401,7 @@ func startEth(ctx *cli.Context, eth *eth.Ethereum) {
} }
} }
if ctx.GlobalBool(utils.MiningEnabledFlag.Name) { if ctx.GlobalBool(utils.MiningEnabledFlag.Name) {
if err := eth.StartMining(); err != nil { if err := eth.StartMining(ctx.GlobalInt(utils.MinerThreadsFlag.Name)); err != nil {
utils.Fatalf("%v", err) utils.Fatalf("%v", err)
} }
} }

@ -159,7 +159,7 @@ func (self *UiLib) RemoveLocalTransaction(id int) {
func (self *UiLib) ToggleMining() bool { func (self *UiLib) ToggleMining() bool {
if !self.eth.IsMining() { if !self.eth.IsMining() {
err := self.eth.StartMining() err := self.eth.StartMining(4)
return err == nil return err == nil
} else { } else {
self.eth.StopMining() self.eth.StopMining()

@ -267,7 +267,7 @@ func New(config *Config) (*Ethereum, error) {
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit)
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
eth.chainManager.SetProcessor(eth.blockProcessor) eth.chainManager.SetProcessor(eth.blockProcessor)
eth.miner = miner.New(eth, eth.pow, config.MinerThreads) eth.miner = miner.New(eth, eth.pow)
eth.miner.SetGasPrice(config.GasPrice) eth.miner.SetGasPrice(config.GasPrice)
eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader)
@ -368,7 +368,7 @@ func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) {
s.chainManager.ResetWithGenesisBlock(gb) s.chainManager.ResetWithGenesisBlock(gb)
} }
func (s *Ethereum) StartMining() error { func (s *Ethereum) StartMining(threads int) error {
eb, err := s.Etherbase() eb, err := s.Etherbase()
if err != nil { if err != nil {
err = fmt.Errorf("Cannot start mining without etherbase address: %v", err) err = fmt.Errorf("Cannot start mining without etherbase address: %v", err)
@ -376,7 +376,7 @@ func (s *Ethereum) StartMining() error {
return err return err
} }
go s.miner.Start(eb) go s.miner.Start(eb, threads)
return nil return nil
} }
@ -461,13 +461,13 @@ done:
case <-ticker.C: case <-ticker.C:
// don't change the order of database flushes // don't change the order of database flushes
if err := s.extraDb.Flush(); err != nil { if err := s.extraDb.Flush(); err != nil {
glog.Fatalf("fatal error: flush extraDb: %v\n", err) glog.Fatalf("fatal error: flush extraDb: %v (Restart your node. We are aware of this issue)\n", err)
} }
if err := s.stateDb.Flush(); err != nil { if err := s.stateDb.Flush(); err != nil {
glog.Fatalf("fatal error: flush stateDb: %v\n", err) glog.Fatalf("fatal error: flush stateDb: %v (Restart your node. We are aware of this issue)\n", err)
} }
if err := s.blockDb.Flush(); err != nil { if err := s.blockDb.Flush(); err != nil {
glog.Fatalf("fatal error: flush blockDb: %v\n", err) glog.Fatalf("fatal error: flush blockDb: %v (Restart your node. We are aware of this issue)\n", err)
} }
case <-s.shutdownChan: case <-s.shutdownChan:
break done break done

@ -10,7 +10,7 @@ import (
"github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/pow"
) )
type CpuMiner struct { type CpuAgent struct {
chMu sync.Mutex chMu sync.Mutex
c chan *types.Block c chan *types.Block
quit chan struct{} quit chan struct{}
@ -21,8 +21,8 @@ type CpuMiner struct {
pow pow.PoW pow pow.PoW
} }
func NewCpuMiner(index int, pow pow.PoW) *CpuMiner { func NewCpuAgent(index int, pow pow.PoW) *CpuAgent {
miner := &CpuMiner{ miner := &CpuAgent{
pow: pow, pow: pow,
index: index, index: index,
} }
@ -30,16 +30,16 @@ func NewCpuMiner(index int, pow pow.PoW) *CpuMiner {
return miner return miner
} }
func (self *CpuMiner) Work() chan<- *types.Block { return self.c } func (self *CpuAgent) Work() chan<- *types.Block { return self.c }
func (self *CpuMiner) Pow() pow.PoW { return self.pow } func (self *CpuAgent) Pow() pow.PoW { return self.pow }
func (self *CpuMiner) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch } func (self *CpuAgent) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch }
func (self *CpuMiner) Stop() { func (self *CpuAgent) Stop() {
close(self.quit) close(self.quit)
close(self.quitCurrentOp) close(self.quitCurrentOp)
} }
func (self *CpuMiner) Start() { func (self *CpuAgent) Start() {
self.quit = make(chan struct{}) self.quit = make(chan struct{})
self.quitCurrentOp = make(chan struct{}, 1) self.quitCurrentOp = make(chan struct{}, 1)
self.c = make(chan *types.Block, 1) self.c = make(chan *types.Block, 1)
@ -47,7 +47,7 @@ func (self *CpuMiner) Start() {
go self.update() go self.update()
} }
func (self *CpuMiner) update() { func (self *CpuAgent) update() {
out: out:
for { for {
select { select {
@ -76,7 +76,7 @@ done:
} }
} }
func (self *CpuMiner) mine(block *types.Block) { func (self *CpuAgent) mine(block *types.Block) {
glog.V(logger.Debug).Infof("(re)started agent[%d]. mining...\n", self.index) glog.V(logger.Debug).Infof("(re)started agent[%d]. mining...\n", self.index)
// Reset the channel // Reset the channel
@ -95,6 +95,6 @@ func (self *CpuMiner) mine(block *types.Block) {
} }
} }
func (self *CpuMiner) GetHashRate() int64 { func (self *CpuAgent) GetHashRate() int64 {
return self.pow.GetHashrate() return self.pow.GetHashrate()
} }

@ -23,16 +23,8 @@ type Miner struct {
pow pow.PoW pow pow.PoW
} }
func New(eth core.Backend, pow pow.PoW, minerThreads int) *Miner { func New(eth core.Backend, pow pow.PoW) *Miner {
// note: minerThreads is currently ignored because return &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)}
// ethash is not thread safe.
miner := &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)}
for i := 0; i < minerThreads; i++ {
miner.worker.register(NewCpuMiner(i, pow))
}
miner.threads = minerThreads
return miner
} }
func (self *Miner) Mining() bool { func (self *Miner) Mining() bool {
@ -48,15 +40,27 @@ func (m *Miner) SetGasPrice(price *big.Int) {
m.worker.gasPrice = price m.worker.gasPrice = price
} }
func (self *Miner) Start(coinbase common.Address) { func (self *Miner) Start(coinbase common.Address, threads int) {
glog.V(logger.Info).Infoln("Starting mining operation")
self.mining = true self.mining = true
for i := 0; i < threads; i++ {
self.worker.register(NewCpuAgent(i, self.pow))
}
self.threads = threads
glog.V(logger.Info).Infof("Starting mining operation (CPU=%d TOT=%d)\n", threads, len(self.worker.agents))
self.worker.coinbase = coinbase self.worker.coinbase = coinbase
self.worker.start() self.worker.start()
self.worker.commitNewWork() self.worker.commitNewWork()
} }
func (self *Miner) Stop() {
self.worker.stop()
self.mining = false
}
func (self *Miner) Register(agent Agent) { func (self *Miner) Register(agent Agent) {
if self.mining { if self.mining {
agent.Start() agent.Start()
@ -65,11 +69,6 @@ func (self *Miner) Register(agent Agent) {
self.worker.register(agent) self.worker.register(agent)
} }
func (self *Miner) Stop() {
self.mining = false
self.worker.stop()
}
func (self *Miner) HashRate() int64 { func (self *Miner) HashRate() int64 {
return self.worker.HashRate() return self.worker.HashRate()
} }

@ -141,7 +141,6 @@ func (self *worker) start() {
for _, agent := range self.agents { for _, agent := range self.agents {
agent.Start() agent.Start()
} }
} }
func (self *worker) stop() { func (self *worker) stop() {
@ -149,10 +148,16 @@ func (self *worker) stop() {
defer self.mu.Unlock() defer self.mu.Unlock()
if atomic.LoadInt32(&self.mining) == 1 { if atomic.LoadInt32(&self.mining) == 1 {
var keep []Agent
// stop all agents // stop all agents
for _, agent := range self.agents { for _, agent := range self.agents {
agent.Stop() agent.Stop()
// keep all that's not a cpu agent
if _, ok := agent.(*CpuAgent); !ok {
keep = append(keep, agent)
}
} }
self.agents = keep
} }
atomic.StoreInt32(&self.mining, 0) atomic.StoreInt32(&self.mining, 0)

@ -391,7 +391,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
} }
*reply = NewLogsRes(api.xeth().AllLogs(args.Earliest, args.Latest, args.Skip, args.Max, args.Address, args.Topics)) *reply = NewLogsRes(api.xeth().AllLogs(args.Earliest, args.Latest, args.Skip, args.Max, args.Address, args.Topics))
case "eth_getWork": case "eth_getWork":
api.xeth().SetMining(true) api.xeth().SetMining(true, 0)
*reply = api.xeth().RemoteMining().GetWork() *reply = api.xeth().RemoteMining().GetWork()
case "eth_submitWork": case "eth_submitWork":
args := new(SubmitWorkArgs) args := new(SubmitWorkArgs)

@ -425,10 +425,10 @@ func (self *XEth) ClientVersion() string {
return self.backend.ClientVersion() return self.backend.ClientVersion()
} }
func (self *XEth) SetMining(shouldmine bool) bool { func (self *XEth) SetMining(shouldmine bool, threads int) bool {
ismining := self.backend.IsMining() ismining := self.backend.IsMining()
if shouldmine && !ismining { if shouldmine && !ismining {
err := self.backend.StartMining() err := self.backend.StartMining(threads)
return err == nil return err == nil
} }
if ismining && !shouldmine { if ismining && !shouldmine {