core, eth, event, miner, xeth: fix event post / subscription race

This commit is contained in:
Péter Szilágyi 2015-10-12 15:04:38 +03:00
parent 315a422ba7
commit 402fd6e8c6
11 changed files with 123 additions and 94 deletions

@ -483,13 +483,6 @@ func (bc *BlockChain) Stop() {
glog.V(logger.Info).Infoln("Chain manager stopped") glog.V(logger.Info).Infoln("Chain manager stopped")
} }
type queueEvent struct {
queue []interface{}
canonicalCount int
sideCount int
splitCount int
}
func (self *BlockChain) procFutureBlocks() { func (self *BlockChain) procFutureBlocks() {
blocks := make([]*types.Block, self.futureBlocks.Len()) blocks := make([]*types.Block, self.futureBlocks.Len())
for i, hash := range self.futureBlocks.Keys() { for i, hash := range self.futureBlocks.Keys() {
@ -573,10 +566,9 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// faster than direct delivery and requires much less mutex // faster than direct delivery and requires much less mutex
// acquiring. // acquiring.
var ( var (
queue = make([]interface{}, len(chain)) stats struct{ queued, processed, ignored int }
queueEvent = queueEvent{queue: queue} events = make([]interface{}, 0, len(chain))
stats struct{ queued, processed, ignored int } tstart = time.Now()
tstart = time.Now()
nonceChecked = make([]bool, len(chain)) nonceChecked = make([]bool, len(chain))
) )
@ -659,22 +651,21 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if glog.V(logger.Debug) { if glog.V(logger.Debug) {
glog.Infof("[%v] inserted block #%d (%d TXs %v G %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), block.GasUsed(), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) glog.Infof("[%v] inserted block #%d (%d TXs %v G %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), block.GasUsed(), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
} }
queue[i] = ChainEvent{block, block.Hash(), logs} events = append(events, ChainEvent{block, block.Hash(), logs})
queueEvent.canonicalCount++
// This puts transactions in a extra db for rpc // This puts transactions in a extra db for rpc
PutTransactions(self.chainDb, block, block.Transactions()) PutTransactions(self.chainDb, block, block.Transactions())
// store the receipts // store the receipts
PutReceipts(self.chainDb, receipts) PutReceipts(self.chainDb, receipts)
case SideStatTy: case SideStatTy:
if glog.V(logger.Detail) { if glog.V(logger.Detail) {
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
} }
queue[i] = ChainSideEvent{block, logs} events = append(events, ChainSideEvent{block, logs})
queueEvent.sideCount++
case SplitStatTy: case SplitStatTy:
queue[i] = ChainSplitEvent{block, logs} events = append(events, ChainSplitEvent{block, logs})
queueEvent.splitCount++
} }
stats.processed++ stats.processed++
} }
@ -684,8 +675,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
start, end := chain[0], chain[len(chain)-1] start, end := chain[0], chain[len(chain)-1]
glog.Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n", stats.processed, stats.queued, stats.ignored, txcount, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4]) glog.Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n", stats.processed, stats.queued, stats.ignored, txcount, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4])
} }
go self.postChainEvents(events)
go self.eventMux.Post(queueEvent)
return 0, nil return 0, nil
} }
@ -774,32 +764,31 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return nil return nil
} }
// postChainEvents iterates over the events generated by a chain insertion and
// posts them into the event mux.
func (self *BlockChain) postChainEvents(events []interface{}) {
for _, event := range events {
if event, ok := event.(ChainEvent); ok {
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
// and in most cases isn't even necessary.
if self.currentBlock.Hash() == event.Hash {
self.currentGasLimit = CalcGasLimit(event.Block)
self.eventMux.Post(ChainHeadEvent{event.Block})
}
}
// Fire the insertion events individually too
self.eventMux.Post(event)
}
}
func (self *BlockChain) update() { func (self *BlockChain) update() {
events := self.eventMux.Subscribe(queueEvent{})
futureTimer := time.Tick(5 * time.Second) futureTimer := time.Tick(5 * time.Second)
out:
for { for {
select { select {
case ev := <-events.Chan():
switch ev := ev.(type) {
case queueEvent:
for _, event := range ev.queue {
switch event := event.(type) {
case ChainEvent:
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
// and in most cases isn't even necessary.
if self.currentBlock.Hash() == event.Hash {
self.currentGasLimit = CalcGasLimit(event.Block)
self.eventMux.Post(ChainHeadEvent{event.Block})
}
}
self.eventMux.Post(event)
}
}
case <-futureTimer: case <-futureTimer:
self.procFutureBlocks() self.procFutureBlocks()
case <-self.quit: case <-self.quit:
break out return
} }
} }
} }

@ -93,7 +93,7 @@ func (pool *TxPool) eventLoop() {
// we need to know the new state. The new state will help us determine // we need to know the new state. The new state will help us determine
// the nonces in the managed state // the nonces in the managed state
for ev := range pool.events.Chan() { for ev := range pool.events.Chan() {
switch ev := ev.(type) { switch ev := ev.Data.(type) {
case ChainHeadEvent: case ChainHeadEvent:
pool.mu.Lock() pool.mu.Lock()
pool.resetState() pool.resetState()

@ -20,6 +20,7 @@ package filters
import ( import (
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
@ -35,6 +36,7 @@ type FilterSystem struct {
filterMu sync.RWMutex filterMu sync.RWMutex
filterId int filterId int
filters map[int]*Filter filters map[int]*Filter
created map[int]time.Time
quit chan struct{} quit chan struct{}
} }
@ -44,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{ fs := &FilterSystem{
eventMux: mux, eventMux: mux,
filters: make(map[int]*Filter), filters: make(map[int]*Filter),
created: make(map[int]time.Time),
} }
go fs.filterLoop() go fs.filterLoop()
return fs return fs
@ -60,6 +63,7 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) {
defer fs.filterMu.Unlock() defer fs.filterMu.Unlock()
id = fs.filterId id = fs.filterId
fs.filters[id] = filter fs.filters[id] = filter
fs.created[id] = time.Now()
fs.filterId++ fs.filterId++
return id return id
@ -69,15 +73,16 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) {
func (fs *FilterSystem) Remove(id int) { func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock() fs.filterMu.Lock()
defer fs.filterMu.Unlock() defer fs.filterMu.Unlock()
if _, ok := fs.filters[id]; ok {
delete(fs.filters, id) delete(fs.filters, id)
} delete(fs.created, id)
} }
// Get retrieves a filter installed using Add The filter may not be modified. // Get retrieves a filter installed using Add The filter may not be modified.
func (fs *FilterSystem) Get(id int) *Filter { func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock() fs.filterMu.RLock()
defer fs.filterMu.RUnlock() defer fs.filterMu.RUnlock()
return fs.filters[id] return fs.filters[id]
} }
@ -85,42 +90,49 @@ func (fs *FilterSystem) Get(id int) *Filter {
// when the filter matches the requirements. // when the filter matches the requirements.
func (fs *FilterSystem) filterLoop() { func (fs *FilterSystem) filterLoop() {
// Subscribe to events // Subscribe to events
events := fs.eventMux.Subscribe( eventCh := fs.eventMux.Subscribe(
//core.PendingBlockEvent{}, //core.PendingBlockEvent{},
core.ChainEvent{}, core.ChainEvent{},
core.TxPreEvent{}, core.TxPreEvent{},
vm.Logs(nil)) vm.Logs(nil),
).Chan()
out: out:
for { for {
select { select {
case <-fs.quit: case <-fs.quit:
break out break out
case event := <-events.Chan(): case event, ok := <-eventCh:
switch event := event.(type) { if !ok {
// Event subscription closed, set the channel to nil to stop spinning
eventCh = nil
continue
}
// A real event arrived, notify the registered filters
switch ev := event.Data.(type) {
case core.ChainEvent: case core.ChainEvent:
fs.filterMu.RLock() fs.filterMu.RLock()
for _, filter := range fs.filters { for id, filter := range fs.filters {
if filter.BlockCallback != nil { if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
filter.BlockCallback(event.Block, event.Logs) filter.BlockCallback(ev.Block, ev.Logs)
} }
} }
fs.filterMu.RUnlock() fs.filterMu.RUnlock()
case core.TxPreEvent: case core.TxPreEvent:
fs.filterMu.RLock() fs.filterMu.RLock()
for _, filter := range fs.filters { for id, filter := range fs.filters {
if filter.TransactionCallback != nil { if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
filter.TransactionCallback(event.Tx) filter.TransactionCallback(ev.Tx)
} }
} }
fs.filterMu.RUnlock() fs.filterMu.RUnlock()
case vm.Logs: case vm.Logs:
fs.filterMu.RLock() fs.filterMu.RLock()
for _, filter := range fs.filters { for id, filter := range fs.filters {
if filter.LogsCallback != nil { if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
msgs := filter.FilterLogs(event) msgs := filter.FilterLogs(ev)
if len(msgs) > 0 { if len(msgs) > 0 {
filter.LogsCallback(msgs) filter.LogsCallback(msgs)
} }

@ -84,19 +84,16 @@ func (self *GasPriceOracle) processPastBlocks() {
} }
func (self *GasPriceOracle) listenLoop() { func (self *GasPriceOracle) listenLoop() {
for { defer self.events.Unsubscribe()
ev, isopen := <-self.events.Chan()
if !isopen { for event := range self.events.Chan() {
break switch event := event.Data.(type) {
}
switch ev := ev.(type) {
case core.ChainEvent: case core.ChainEvent:
self.processBlock(ev.Block) self.processBlock(event.Block)
case core.ChainSplitEvent: case core.ChainSplitEvent:
self.processBlock(ev.Block) self.processBlock(event.Block)
} }
} }
self.events.Unsubscribe()
} }
func (self *GasPriceOracle) processBlock(block *types.Block) { func (self *GasPriceOracle) processBlock(block *types.Block) {

@ -687,7 +687,7 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction)
func (self *ProtocolManager) minedBroadcastLoop() { func (self *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe // automatically stops if unsubscribe
for obj := range self.minedBlockSub.Chan() { for obj := range self.minedBlockSub.Chan() {
switch ev := obj.(type) { switch ev := obj.Data.(type) {
case core.NewMinedBlockEvent: case core.NewMinedBlockEvent:
self.BroadcastBlock(ev.Block, true) // First propagate block to peers self.BroadcastBlock(ev.Block, true) // First propagate block to peers
self.BroadcastBlock(ev.Block, false) // Only then announce to the rest self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
@ -698,7 +698,7 @@ func (self *ProtocolManager) minedBroadcastLoop() {
func (self *ProtocolManager) txBroadcastLoop() { func (self *ProtocolManager) txBroadcastLoop() {
// automatically stops if unsubscribe // automatically stops if unsubscribe
for obj := range self.txSub.Chan() { for obj := range self.txSub.Chan() {
event := obj.(core.TxPreEvent) event := obj.Data.(core.TxPreEvent)
self.BroadcastTx(event.Tx.Hash(), event.Tx) self.BroadcastTx(event.Tx.Hash(), event.Tx)
} }
} }

@ -22,14 +22,21 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
"time"
) )
// Event is a time-tagged notification pushed to subscribers.
type Event struct {
Time time.Time
Data interface{}
}
// Subscription is implemented by event subscriptions. // Subscription is implemented by event subscriptions.
type Subscription interface { type Subscription interface {
// Chan returns a channel that carries events. // Chan returns a channel that carries events.
// Implementations should return the same channel // Implementations should return the same channel
// for any subsequent calls to Chan. // for any subsequent calls to Chan.
Chan() <-chan interface{} Chan() <-chan *Event
// Unsubscribe stops delivery of events to a subscription. // Unsubscribe stops delivery of events to a subscription.
// The event channel is closed. // The event channel is closed.
@ -82,6 +89,10 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
// Post sends an event to all receivers registered for the given type. // Post sends an event to all receivers registered for the given type.
// It returns ErrMuxClosed if the mux has been stopped. // It returns ErrMuxClosed if the mux has been stopped.
func (mux *TypeMux) Post(ev interface{}) error { func (mux *TypeMux) Post(ev interface{}) error {
event := &Event{
Time: time.Now(),
Data: ev,
}
rtyp := reflect.TypeOf(ev) rtyp := reflect.TypeOf(ev)
mux.mutex.RLock() mux.mutex.RLock()
if mux.stopped { if mux.stopped {
@ -91,7 +102,7 @@ func (mux *TypeMux) Post(ev interface{}) error {
subs := mux.subm[rtyp] subs := mux.subm[rtyp]
mux.mutex.RUnlock() mux.mutex.RUnlock()
for _, sub := range subs { for _, sub := range subs {
sub.deliver(ev) sub.deliver(event)
} }
return nil return nil
} }
@ -143,6 +154,7 @@ func posdelete(slice []*muxsub, pos int) []*muxsub {
type muxsub struct { type muxsub struct {
mux *TypeMux mux *TypeMux
created time.Time
closeMu sync.Mutex closeMu sync.Mutex
closing chan struct{} closing chan struct{}
closed bool closed bool
@ -151,21 +163,22 @@ type muxsub struct {
// postC can be set to nil without affecting the return value of // postC can be set to nil without affecting the return value of
// Chan. // Chan.
postMu sync.RWMutex postMu sync.RWMutex
readC <-chan interface{} readC <-chan *Event
postC chan<- interface{} postC chan<- *Event
} }
func newsub(mux *TypeMux) *muxsub { func newsub(mux *TypeMux) *muxsub {
c := make(chan interface{}) c := make(chan *Event)
return &muxsub{ return &muxsub{
mux: mux, mux: mux,
created: time.Now(),
readC: c, readC: c,
postC: c, postC: c,
closing: make(chan struct{}), closing: make(chan struct{}),
} }
} }
func (s *muxsub) Chan() <-chan interface{} { func (s *muxsub) Chan() <-chan *Event {
return s.readC return s.readC
} }
@ -189,11 +202,17 @@ func (s *muxsub) closewait() {
s.postMu.Unlock() s.postMu.Unlock()
} }
func (s *muxsub) deliver(ev interface{}) { func (s *muxsub) deliver(event *Event) {
// Short circuit delivery if stale event
if s.created.After(event.Time) {
return
}
// Otherwise deliver the event
s.postMu.RLock() s.postMu.RLock()
defer s.postMu.RUnlock()
select { select {
case s.postC <- ev: case s.postC <- event:
case <-s.closing: case <-s.closing:
} }
s.postMu.RUnlock()
} }

@ -37,7 +37,7 @@ func TestSub(t *testing.T) {
}() }()
ev := <-sub.Chan() ev := <-sub.Chan()
if ev.(testEvent) != testEvent(5) { if ev.Data.(testEvent) != testEvent(5) {
t.Errorf("Got %v (%T), expected event %v (%T)", t.Errorf("Got %v (%T), expected event %v (%T)",
ev, ev, testEvent(5), testEvent(5)) ev, ev, testEvent(5), testEvent(5))
} }

@ -30,7 +30,7 @@ func ExampleTypeMux() {
sub := mux.Subscribe(someEvent{}, otherEvent{}) sub := mux.Subscribe(someEvent{}, otherEvent{})
go func() { go func() {
for event := range sub.Chan() { for event := range sub.Chan() {
fmt.Printf("Received: %#v\n", event) fmt.Printf("Received: %#v\n", event.Data)
} }
fmt.Println("done") fmt.Println("done")
close(done) close(done)

@ -66,7 +66,7 @@ func (self *Miner) update() {
events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
out: out:
for ev := range events.Chan() { for ev := range events.Chan() {
switch ev.(type) { switch ev.Data.(type) {
case downloader.StartEvent: case downloader.StartEvent:
atomic.StoreInt32(&self.canStart, 0) atomic.StoreInt32(&self.canStart, 0)
if self.Mining() { if self.Mining() {

@ -215,13 +215,20 @@ func (self *worker) register(agent Agent) {
} }
func (self *worker) update() { func (self *worker) update() {
events := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) eventSub := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
defer eventSub.Unsubscribe()
out: eventCh := eventSub.Chan()
for { for {
select { select {
case event := <-events.Chan(): case event, ok := <-eventCh:
switch ev := event.(type) { if !ok {
// Event subscription closed, set the channel to nil to stop spinning
eventCh = nil
continue
}
// A real event arrived, process interesting content
switch ev := event.Data.(type) {
case core.ChainHeadEvent: case core.ChainHeadEvent:
self.commitNewWork() self.commitNewWork()
case core.ChainSideEvent: case core.ChainSideEvent:
@ -237,11 +244,9 @@ out:
} }
} }
case <-self.quit: case <-self.quit:
break out return
} }
} }
events.Unsubscribe()
} }
func newLocalMinedBlock(blockNumber uint64, prevMinedBlocks *uint64RingBuffer) (minedBlocks *uint64RingBuffer) { func newLocalMinedBlock(blockNumber uint64, prevMinedBlocks *uint64RingBuffer) (minedBlocks *uint64RingBuffer) {

@ -244,30 +244,37 @@ func (self *XEth) State() *State { return self.state }
func (self *XEth) UpdateState() (wait chan *big.Int) { func (self *XEth) UpdateState() (wait chan *big.Int) {
wait = make(chan *big.Int) wait = make(chan *big.Int)
go func() { go func() {
sub := self.backend.EventMux().Subscribe(core.ChainHeadEvent{}) eventSub := self.backend.EventMux().Subscribe(core.ChainHeadEvent{})
defer eventSub.Unsubscribe()
var m, n *big.Int var m, n *big.Int
var ok bool var ok bool
out:
eventCh := eventSub.Chan()
for { for {
select { select {
case event := <-sub.Chan(): case event, ok := <-eventCh:
ev, ok := event.(core.ChainHeadEvent) if !ok {
if ok { // Event subscription closed, set the channel to nil to stop spinning
m = ev.Block.Number() eventCh = nil
continue
}
// A real event arrived, process if new head block assignment
if event, ok := event.Data.(core.ChainHeadEvent); ok {
m = event.Block.Number()
if n != nil && n.Cmp(m) < 0 { if n != nil && n.Cmp(m) < 0 {
wait <- n wait <- n
n = nil n = nil
} }
statedb := state.New(ev.Block.Root(), self.backend.ChainDb()) statedb := state.New(event.Block.Root(), self.backend.ChainDb())
self.state = NewState(self, statedb) self.state = NewState(self, statedb)
} }
case n, ok = <-wait: case n, ok = <-wait:
if !ok { if !ok {
break out return
} }
} }
} }
sub.Unsubscribe()
}() }()
return return
} }