Fixed miner

* Miners could stall because the worker wasn't aware the miner was done
This commit is contained in:
obscuren 2015-03-26 17:45:03 +01:00
parent d0fa0a234d
commit d36501a6e5
3 changed files with 35 additions and 15 deletions

@ -1,12 +1,15 @@
package miner package miner
import ( import (
"sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/pow"
) )
type CpuMiner struct { type CpuMiner struct {
chMu sync.Mutex
c chan *types.Block c chan *types.Block
quit chan struct{} quit chan struct{}
quitCurrentOp chan struct{} quitCurrentOp chan struct{}
@ -43,16 +46,13 @@ func (self *CpuMiner) Start() {
} }
func (self *CpuMiner) update() { func (self *CpuMiner) update() {
justStarted := true
out: out:
for { for {
select { select {
case block := <-self.c: case block := <-self.c:
if justStarted { self.chMu.Lock()
justStarted = true self.quitCurrentOp <- struct{}{}
} else { self.chMu.Unlock()
self.quitCurrentOp <- struct{}{}
}
go self.mine(block) go self.mine(block)
case <-self.quit: case <-self.quit:
@ -60,6 +60,7 @@ out:
} }
} }
close(self.quitCurrentOp)
done: done:
// Empty channel // Empty channel
for { for {
@ -75,12 +76,20 @@ done:
func (self *CpuMiner) mine(block *types.Block) { func (self *CpuMiner) mine(block *types.Block) {
minerlogger.Debugf("(re)started agent[%d]. mining...\n", self.index) minerlogger.Debugf("(re)started agent[%d]. mining...\n", self.index)
// Reset the channel
self.chMu.Lock()
self.quitCurrentOp = make(chan struct{}, 1)
self.chMu.Unlock()
// Mine
nonce, mixDigest, _ := self.pow.Search(block, self.quitCurrentOp) nonce, mixDigest, _ := self.pow.Search(block, self.quitCurrentOp)
if nonce != 0 { if nonce != 0 {
block.SetNonce(nonce) block.SetNonce(nonce)
block.Header().MixDigest = common.BytesToHash(mixDigest) block.Header().MixDigest = common.BytesToHash(mixDigest)
self.returnCh <- block self.returnCh <- block
//self.returnCh <- Work{block.Number().Uint64(), nonce, mixDigest, seedHash} } else {
self.returnCh <- nil
} }
} }

@ -50,6 +50,7 @@ out:
break out break out
case work := <-a.workCh: case work := <-a.workCh:
a.work = work a.work = work
a.returnCh <- nil
} }
} }
} }

@ -5,6 +5,7 @@ import (
"math/big" "math/big"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -58,13 +59,14 @@ type Agent interface {
} }
type worker struct { type worker struct {
mu sync.Mutex mu sync.Mutex
agents []Agent agents []Agent
recv chan *types.Block recv chan *types.Block
mux *event.TypeMux mux *event.TypeMux
quit chan struct{} quit chan struct{}
pow pow.PoW pow pow.PoW
atWork int atWork int64
eth core.Backend eth core.Backend
chain *core.ChainManager chain *core.ChainManager
@ -107,7 +109,7 @@ func (self *worker) start() {
func (self *worker) stop() { func (self *worker) stop() {
self.mining = false self.mining = false
self.atWork = 0 atomic.StoreInt64(&self.atWork, 0)
close(self.quit) close(self.quit)
} }
@ -135,9 +137,6 @@ out:
self.uncleMu.Unlock() self.uncleMu.Unlock()
} }
if self.atWork == 0 {
self.commitNewWork()
}
case <-self.quit: case <-self.quit:
// stop all agents // stop all agents
for _, agent := range self.agents { for _, agent := range self.agents {
@ -146,6 +145,11 @@ out:
break out break out
case <-timer.C: case <-timer.C:
minerlogger.Infoln("Hash rate:", self.HashRate(), "Khash") minerlogger.Infoln("Hash rate:", self.HashRate(), "Khash")
// XXX In case all mined a possible uncle
if atomic.LoadInt64(&self.atWork) == 0 {
self.commitNewWork()
}
} }
} }
@ -155,6 +159,12 @@ out:
func (self *worker) wait() { func (self *worker) wait() {
for { for {
for block := range self.recv { for block := range self.recv {
atomic.AddInt64(&self.atWork, -1)
if block == nil {
continue
}
if err := self.chain.InsertChain(types.Blocks{block}); err == nil { if err := self.chain.InsertChain(types.Blocks{block}); err == nil {
for _, uncle := range block.Uncles() { for _, uncle := range block.Uncles() {
delete(self.possibleUncles, uncle.Hash()) delete(self.possibleUncles, uncle.Hash())
@ -170,7 +180,6 @@ func (self *worker) wait() {
} else { } else {
self.commitNewWork() self.commitNewWork()
} }
self.atWork--
} }
} }
} }
@ -182,8 +191,9 @@ func (self *worker) push() {
// push new work to agents // push new work to agents
for _, agent := range self.agents { for _, agent := range self.agents {
atomic.AddInt64(&self.atWork, 1)
agent.Work() <- self.current.block.Copy() agent.Work() <- self.current.block.Copy()
self.atWork++
} }
} }
} }