Merge pull request #2205 from obscuren/pending-filters

eth/filters:  pending logs 
This commit is contained in:
Péter Szilágyi 2016-02-13 14:53:59 +02:00
commit cb85923828
8 changed files with 143 additions and 52 deletions

@ -1358,7 +1358,7 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
go self.eventMux.Post(RemovedTransactionEvent{diff}) go self.eventMux.Post(RemovedTransactionEvent{diff})
} }
if len(deletedLogs) > 0 { if len(deletedLogs) > 0 {
go self.eventMux.Post(RemovedLogEvent{deletedLogs}) go self.eventMux.Post(RemovedLogsEvent{deletedLogs})
} }
return nil return nil

@ -982,7 +982,7 @@ func TestLogReorgs(t *testing.T) {
evmux := &event.TypeMux{} evmux := &event.TypeMux{}
blockchain, _ := NewBlockChain(db, FakePow{}, evmux) blockchain, _ := NewBlockChain(db, FakePow{}, evmux)
subs := evmux.Subscribe(RemovedLogEvent{}) subs := evmux.Subscribe(RemovedLogsEvent{})
chain, _ := GenerateChain(genesis, db, 2, func(i int, gen *BlockGen) { chain, _ := GenerateChain(genesis, db, 2, func(i int, gen *BlockGen) {
if i == 1 { if i == 1 {
tx, err := types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code).SignECDSA(key1) tx, err := types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code).SignECDSA(key1)
@ -1002,7 +1002,7 @@ func TestLogReorgs(t *testing.T) {
} }
ev := <-subs.Chan() ev := <-subs.Chan()
if len(ev.Data.(RemovedLogEvent).Logs) == 0 { if len(ev.Data.(RemovedLogsEvent).Logs) == 0 {
t.Error("expected logs") t.Error("expected logs")
} }
} }

@ -30,6 +30,11 @@ type TxPreEvent struct{ Tx *types.Transaction }
// TxPostEvent is posted when a transaction has been processed. // TxPostEvent is posted when a transaction has been processed.
type TxPostEvent struct{ Tx *types.Transaction } type TxPostEvent struct{ Tx *types.Transaction }
// PendingLogsEvent is posted pre mining and notifies of pending logs.
type PendingLogsEvent struct {
Logs vm.Logs
}
// NewBlockEvent is posted when a block has been imported. // NewBlockEvent is posted when a block has been imported.
type NewBlockEvent struct{ Block *types.Block } type NewBlockEvent struct{ Block *types.Block }
@ -40,7 +45,7 @@ type NewMinedBlockEvent struct{ Block *types.Block }
type RemovedTransactionEvent struct{ Txs types.Transactions } type RemovedTransactionEvent struct{ Txs types.Transactions }
// RemovedLogEvent is posted when a reorg happens // RemovedLogEvent is posted when a reorg happens
type RemovedLogEvent struct{ Logs vm.Logs } type RemovedLogsEvent struct{ Logs vm.Logs }
// ChainSplit is posted when a new head is detected // ChainSplit is posted when a new head is detected
type ChainSplitEvent struct { type ChainSplitEvent struct {

@ -142,7 +142,11 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
s.blockMu.Lock() s.blockMu.Lock()
filter := New(s.chainDb) filter := New(s.chainDb)
id := s.filterManager.Add(filter) id, err := s.filterManager.Add(filter, ChainFilter)
if err != nil {
return "", err
}
s.blockQueue[id] = &hashQueue{timeout: time.Now()} s.blockQueue[id] = &hashQueue{timeout: time.Now()}
filter.BlockCallback = func(block *types.Block, logs vm.Logs) { filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
@ -174,7 +178,11 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
defer s.transactionMu.Unlock() defer s.transactionMu.Unlock()
filter := New(s.chainDb) filter := New(s.chainDb)
id := s.filterManager.Add(filter) id, err := s.filterManager.Add(filter, PendingTxFilter)
if err != nil {
return "", err
}
s.transactionQueue[id] = &hashQueue{timeout: time.Now()} s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) { filter.TransactionCallback = func(tx *types.Transaction) {
@ -194,12 +202,16 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
} }
// newLogFilter creates a new log filter. // newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) int { func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) {
s.logMu.Lock() s.logMu.Lock()
defer s.logMu.Unlock() defer s.logMu.Unlock()
filter := New(s.chainDb) filter := New(s.chainDb)
id := s.filterManager.Add(filter) id, err := s.filterManager.Add(filter, LogFilter)
if err != nil {
return 0, err
}
s.logQueue[id] = &logQueue{timeout: time.Now()} s.logQueue[id] = &logQueue{timeout: time.Now()}
filter.SetBeginBlock(earliest) filter.SetBeginBlock(earliest)
@ -215,7 +227,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
} }
} }
return id return id, nil
} }
// NewFilterArgs represents a request to create a new filter. // NewFilterArgs represents a request to create a new filter.
@ -352,9 +364,12 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
var id int var id int
if len(args.Addresses) > 0 { if len(args.Addresses) > 0 {
id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics) id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
} else { } else {
id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics) id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
}
if err != nil {
return "", err
} }
s.filterMapMu.Lock() s.filterMapMu.Lock()

@ -18,6 +18,7 @@ package filters
import ( import (
"math" "math"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -32,6 +33,8 @@ type AccountChange struct {
// Filtering interface // Filtering interface
type Filter struct { type Filter struct {
created time.Time
db ethdb.Database db ethdb.Database
begin, end int64 begin, end int64
addresses []common.Address addresses []common.Address

@ -19,6 +19,7 @@
package filters package filters
import ( import (
"fmt"
"sync" "sync"
"time" "time"
@ -27,26 +28,47 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
) )
// FilterType determines the type of filter and is used to put the filter in to
// the correct bucket when added.
type FilterType byte
const (
ChainFilter FilterType = iota // new block events filter
PendingTxFilter // pending transaction filter
LogFilter // new or removed log filter
PendingLogFilter // pending log filter
)
// FilterSystem manages filters that filter specific events such as // FilterSystem manages filters that filter specific events such as
// block, transaction and log events. The Filtering system can be used to listen // block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine). // for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct { type FilterSystem struct {
filterMu sync.RWMutex filterMu sync.RWMutex
filterId int filterId int
filters map[int]*Filter
created map[int]time.Time chainFilters map[int]*Filter
sub event.Subscription pendingTxFilters map[int]*Filter
logFilters map[int]*Filter
pendingLogFilters map[int]*Filter
// generic is an ugly hack for Get
generic map[int]*Filter
sub event.Subscription
} }
// NewFilterSystem returns a newly allocated filter manager // NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem { func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{ fs := &FilterSystem{
filters: make(map[int]*Filter), chainFilters: make(map[int]*Filter),
created: make(map[int]time.Time), pendingTxFilters: make(map[int]*Filter),
logFilters: make(map[int]*Filter),
pendingLogFilters: make(map[int]*Filter),
generic: make(map[int]*Filter),
} }
fs.sub = mux.Subscribe( fs.sub = mux.Subscribe(
//core.PendingBlockEvent{}, core.PendingLogsEvent{},
core.RemovedLogEvent{}, core.RemovedLogsEvent{},
core.ChainEvent{}, core.ChainEvent{},
core.TxPreEvent{}, core.TxPreEvent{},
vm.Logs(nil), vm.Logs(nil),
@ -61,15 +83,30 @@ func (fs *FilterSystem) Stop() {
} }
// Add adds a filter to the filter manager // Add adds a filter to the filter manager
func (fs *FilterSystem) Add(filter *Filter) (id int) { func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
fs.filterMu.Lock() fs.filterMu.Lock()
defer fs.filterMu.Unlock() defer fs.filterMu.Unlock()
id = fs.filterId
fs.filters[id] = filter id := fs.filterId
fs.created[id] = time.Now() filter.created = time.Now()
switch filterType {
case ChainFilter:
fs.chainFilters[id] = filter
case PendingTxFilter:
fs.pendingTxFilters[id] = filter
case LogFilter:
fs.logFilters[id] = filter
case PendingLogFilter:
fs.pendingLogFilters[id] = filter
default:
return 0, fmt.Errorf("unknown filter type %v", filterType)
}
fs.generic[id] = filter
fs.filterId++ fs.filterId++
return id return id, nil
} }
// Remove removes a filter by filter id // Remove removes a filter by filter id
@ -77,16 +114,18 @@ func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock() fs.filterMu.Lock()
defer fs.filterMu.Unlock() defer fs.filterMu.Unlock()
delete(fs.filters, id) delete(fs.chainFilters, id)
delete(fs.created, id) delete(fs.pendingTxFilters, id)
delete(fs.logFilters, id)
delete(fs.pendingLogFilters, id)
delete(fs.generic, id)
} }
// Get retrieves a filter installed using Add The filter may not be modified.
func (fs *FilterSystem) Get(id int) *Filter { func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock() fs.filterMu.RLock()
defer fs.filterMu.RUnlock() defer fs.filterMu.RUnlock()
return fs.filters[id] return fs.generic[id]
} }
// filterLoop waits for specific events from ethereum and fires their handlers // filterLoop waits for specific events from ethereum and fires their handlers
@ -96,17 +135,16 @@ func (fs *FilterSystem) filterLoop() {
switch ev := event.Data.(type) { switch ev := event.Data.(type) {
case core.ChainEvent: case core.ChainEvent:
fs.filterMu.RLock() fs.filterMu.RLock()
for id, filter := range fs.filters { for _, filter := range fs.chainFilters {
if filter.BlockCallback != nil && !fs.created[id].After(event.Time) { if filter.BlockCallback != nil && !filter.created.After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs) filter.BlockCallback(ev.Block, ev.Logs)
} }
} }
fs.filterMu.RUnlock() fs.filterMu.RUnlock()
case core.TxPreEvent: case core.TxPreEvent:
fs.filterMu.RLock() fs.filterMu.RLock()
for id, filter := range fs.filters { for _, filter := range fs.pendingTxFilters {
if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) { if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
filter.TransactionCallback(ev.Tx) filter.TransactionCallback(ev.Tx)
} }
} }
@ -114,25 +152,34 @@ func (fs *FilterSystem) filterLoop() {
case vm.Logs: case vm.Logs:
fs.filterMu.RLock() fs.filterMu.RLock()
for id, filter := range fs.filters { for _, filter := range fs.logFilters {
if filter.LogCallback != nil && !fs.created[id].After(event.Time) { if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, log := range filter.FilterLogs(ev) { for _, log := range filter.FilterLogs(ev) {
filter.LogCallback(log, false) filter.LogCallback(log, false)
} }
} }
} }
fs.filterMu.RUnlock() fs.filterMu.RUnlock()
case core.RemovedLogsEvent:
case core.RemovedLogEvent:
fs.filterMu.RLock() fs.filterMu.RLock()
for id, filter := range fs.filters { for _, filter := range fs.logFilters {
if filter.LogCallback != nil && !fs.created[id].After(event.Time) { if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, removedLog := range ev.Logs { for _, removedLog := range ev.Logs {
filter.LogCallback(removedLog, true) filter.LogCallback(removedLog, true)
} }
} }
} }
fs.filterMu.RUnlock() fs.filterMu.RUnlock()
case core.PendingLogsEvent:
fs.filterMu.RLock()
for _, filter := range fs.pendingLogFilters {
if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, pendingLog := range ev.Logs {
filter.LogCallback(pendingLog, false)
}
}
}
fs.filterMu.RUnlock()
} }
} }
} }

@ -18,6 +18,7 @@ func TestCallbacks(t *testing.T) {
txDone = make(chan struct{}) txDone = make(chan struct{})
logDone = make(chan struct{}) logDone = make(chan struct{})
removedLogDone = make(chan struct{}) removedLogDone = make(chan struct{})
pendingLogDone = make(chan struct{})
) )
blockFilter := &Filter{ blockFilter := &Filter{
@ -37,7 +38,6 @@ func TestCallbacks(t *testing.T) {
} }
}, },
} }
removedLogFilter := &Filter{ removedLogFilter := &Filter{
LogCallback: func(l *vm.Log, oob bool) { LogCallback: func(l *vm.Log, oob bool) {
if oob { if oob {
@ -45,16 +45,23 @@ func TestCallbacks(t *testing.T) {
} }
}, },
} }
pendingLogFilter := &Filter{
LogCallback: func(*vm.Log, bool) {
close(pendingLogDone)
},
}
fs.Add(blockFilter) fs.Add(blockFilter, ChainFilter)
fs.Add(txFilter) fs.Add(txFilter, PendingTxFilter)
fs.Add(logFilter) fs.Add(logFilter, LogFilter)
fs.Add(removedLogFilter) fs.Add(removedLogFilter, LogFilter)
fs.Add(pendingLogFilter, PendingLogFilter)
mux.Post(core.ChainEvent{}) mux.Post(core.ChainEvent{})
mux.Post(core.TxPreEvent{}) mux.Post(core.TxPreEvent{})
mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}})
mux.Post(vm.Logs{&vm.Log{}}) mux.Post(vm.Logs{&vm.Log{}})
mux.Post(core.RemovedLogsEvent{vm.Logs{&vm.Log{}}})
mux.Post(core.PendingLogsEvent{vm.Logs{&vm.Log{}}})
const dura = 5 * time.Second const dura = 5 * time.Second
failTimer := time.NewTimer(dura) failTimer := time.NewTimer(dura)
@ -84,4 +91,11 @@ func TestCallbacks(t *testing.T) {
case <-failTimer.C: case <-failTimer.C:
t.Error("removed log filter failed to trigger (timeout)") t.Error("removed log filter failed to trigger (timeout)")
} }
failTimer.Reset(dura)
select {
case <-pendingLogDone:
case <-failTimer.C:
t.Error("pending log filter failed to trigger (timout)")
}
} }

@ -243,7 +243,7 @@ func (self *worker) update() {
// Apply transaction to the pending state if we're not mining // Apply transaction to the pending state if we're not mining
if atomic.LoadInt32(&self.mining) == 0 { if atomic.LoadInt32(&self.mining) == 0 {
self.currentMu.Lock() self.currentMu.Lock()
self.current.commitTransactions(types.Transactions{ev.Tx}, self.gasPrice, self.chain) self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
self.currentMu.Unlock() self.currentMu.Unlock()
} }
} }
@ -529,7 +529,7 @@ func (self *worker) commitNewWork() {
transactions := append(singleTxOwner, multiTxOwner...) transactions := append(singleTxOwner, multiTxOwner...)
*/ */
work.commitTransactions(transactions, self.gasPrice, self.chain) work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain)
self.eth.TxPool().RemoveTransactions(work.lowGasTxs) self.eth.TxPool().RemoveTransactions(work.lowGasTxs)
// compute uncles for the new block. // compute uncles for the new block.
@ -588,8 +588,10 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
return nil return nil
} }
func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) { func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) {
gp := new(core.GasPool).AddGas(env.header.GasLimit) gp := new(core.GasPool).AddGas(env.header.GasLimit)
var coalescedLogs vm.Logs
for _, tx := range transactions { for _, tx := range transactions {
// We can skip err. It has already been validated in the tx pool // We can skip err. It has already been validated in the tx pool
from, _ := tx.From() from, _ := tx.From()
@ -627,7 +629,7 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b
env.state.StartRecord(tx.Hash(), common.Hash{}, 0) env.state.StartRecord(tx.Hash(), common.Hash{}, 0)
err := env.commitTransaction(tx, bc, gp) err, logs := env.commitTransaction(tx, bc, gp)
switch { switch {
case core.IsGasLimitErr(err): case core.IsGasLimitErr(err):
// ignore the transactor so no nonce errors will be thrown for this account // ignore the transactor so no nonce errors will be thrown for this account
@ -643,20 +645,25 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b
} }
default: default:
env.tcount++ env.tcount++
coalescedLogs = append(coalescedLogs, logs...)
} }
} }
if len(coalescedLogs) > 0 {
go mux.Post(core.PendingLogsEvent{Logs: coalescedLogs})
}
} }
func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) error { func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) (error, vm.Logs) {
snap := env.state.Copy() snap := env.state.Copy()
receipt, _, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed) receipt, logs, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed)
if err != nil { if err != nil {
env.state.Set(snap) env.state.Set(snap)
return err return err, nil
} }
env.txs = append(env.txs, tx) env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt) env.receipts = append(env.receipts, receipt)
return nil
return nil, logs
} }
// TODO: remove or use // TODO: remove or use