diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 05bf649d38..8ef9d9e7e8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1959,7 +1959,7 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf }) stack.RegisterAPIs([]rpc.API{{ Namespace: "eth", - Service: filters.NewFilterAPI(filterSystem, false), + Service: filters.NewFilterAPI(filterSystem), }}) return filterSystem } diff --git a/eth/filters/api.go b/eth/filters/api.go index 56a9de1b21..23fb1faca8 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -34,10 +34,11 @@ import ( ) var ( - errInvalidTopic = errors.New("invalid topic(s)") - errFilterNotFound = errors.New("filter not found") - errInvalidBlockRange = errors.New("invalid block range params") - errExceedMaxTopics = errors.New("exceed max topics") + errInvalidTopic = errors.New("invalid topic(s)") + errFilterNotFound = errors.New("filter not found") + errInvalidBlockRange = errors.New("invalid block range params") + errPendingLogsUnsupported = errors.New("pending logs are not supported") + errExceedMaxTopics = errors.New("exceed max topics") ) // The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0 @@ -70,10 +71,10 @@ type FilterAPI struct { } // NewFilterAPI returns a new FilterAPI instance. -func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI { +func NewFilterAPI(system *FilterSystem) *FilterAPI { api := &FilterAPI{ sys: system, - events: NewEventSystem(system, lightMode), + events: NewEventSystem(system), filters: make(map[rpc.ID]*filter), timeout: system.cfg.Timeout, } @@ -456,7 +457,7 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { f.txs = nil return hashes, nil } - case LogsSubscription, MinedAndPendingLogsSubscription: + case LogsSubscription: logs := f.logs f.logs = nil return returnLogs(logs), nil diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 2f59026b73..09ccb93907 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -108,19 +108,9 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { return f.blockLogs(ctx, header) } - var ( - beginPending = f.begin == rpc.PendingBlockNumber.Int64() - endPending = f.end == rpc.PendingBlockNumber.Int64() - ) - - // special case for pending logs - if beginPending && !endPending { - return nil, errInvalidBlockRange - } - - // Short-cut if all we care about is pending logs - if beginPending && endPending { - return f.pendingLogs(), nil + // Disallow pending logs. + if f.begin == rpc.PendingBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() { + return nil, errPendingLogsUnsupported } resolveSpecial := func(number int64) (int64, error) { @@ -165,16 +155,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { case log := <-logChan: logs = append(logs, log) case err := <-errChan: - if err != nil { - // if an error occurs during extraction, we do return the extracted data - return logs, err - } - // Append the pending ones - if endPending { - pendingLogs := f.pendingLogs() - logs = append(logs, pendingLogs...) - } - return logs, nil + return logs, err } } } @@ -332,22 +313,6 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ return logs, nil } -// pendingLogs returns the logs matching the filter criteria within the pending block. -func (f *Filter) pendingLogs() []*types.Log { - block, receipts, _ := f.sys.backend.Pending() - if block == nil || receipts == nil { - return nil - } - if bloomFilter(block.Bloom(), f.addresses, f.topics) { - var unfiltered []*types.Log - for _, r := range receipts { - unfiltered = append(unfiltered, r.Logs...) - } - return filterLogs(unfiltered, nil, nil, f.addresses, f.topics) - } - return nil -} - // filterLogs creates a slice of logs matching the given criteria. func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log { var check = func(log *types.Log) bool { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index d8b41a4259..a3a2787a41 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -30,8 +30,6 @@ import ( "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -63,7 +61,6 @@ type Backend interface { GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) - Pending() (*types.Block, types.Receipts, *state.StateDB) CurrentHeader() *types.Header ChainConfig() *params.ChainConfig @@ -152,10 +149,6 @@ const ( UnknownSubscription Type = iota // LogsSubscription queries for new or removed (chain reorg) logs LogsSubscription - // PendingLogsSubscription queries for logs in pending blocks - PendingLogsSubscription - // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. - MinedAndPendingLogsSubscription // PendingTransactionsSubscription queries for pending transactions entering // the pending state PendingTransactionsSubscription @@ -192,10 +185,8 @@ type subscription struct { // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria. type EventSystem struct { - backend Backend - sys *FilterSystem - lightMode bool - lastHead *types.Header + backend Backend + sys *FilterSystem // Subscriptions txsSub event.Subscription // Subscription for new transaction event @@ -218,11 +209,10 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { +func NewEventSystem(sys *FilterSystem) *EventSystem { m := &EventSystem{ sys: sys, backend: sys.backend, - lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), txsCh: make(chan core.NewTxsEvent, txChanSize), @@ -310,10 +300,11 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ to = rpc.BlockNumber(crit.ToBlock.Int64()) } - // only interested in pending logs - if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber { - return es.subscribePendingLogs(crit, logs), nil + // Pending logs are not supported anymore. + if from == rpc.PendingBlockNumber || to == rpc.PendingBlockNumber { + return nil, errPendingLogsUnsupported } + // only interested in new mined logs if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil @@ -322,10 +313,6 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ if from >= 0 && to >= 0 && to >= from { return es.subscribeLogs(crit, logs), nil } - // interested in mined logs from a specific block number, new logs and pending logs - if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber { - return es.subscribeMinedPendingLogs(crit, logs), nil - } // interested in logs from a specific block number to new mined blocks if from >= 0 && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil @@ -333,23 +320,6 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ return nil, errInvalidBlockRange } -// subscribeMinedPendingLogs creates a subscription that returned mined and -// pending logs that match the given criteria. -func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { - sub := &subscription{ - id: rpc.NewID(), - typ: MinedAndPendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - txs: make(chan []*types.Transaction), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), - } - return es.subscribe(sub) -} - // subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { @@ -367,23 +337,6 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ return es.subscribe(sub) } -// subscribePendingLogs creates a subscription that writes contract event logs for -// transactions that enter the transaction pool. -func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { - sub := &subscription{ - id: rpc.NewID(), - typ: PendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - txs: make(chan []*types.Transaction), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), - } - return es.subscribe(sub) -} - // SubscribeNewHeads creates a subscription that writes the header of a block that is // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { @@ -430,18 +383,6 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { } } -func (es *EventSystem) handlePendingLogs(filters filterIndex, logs []*types.Log) { - if len(logs) == 0 { - return - } - for _, f := range filters[PendingLogsSubscription] { - matchedLogs := filterLogs(logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) - if len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } -} - func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { for _, f := range filters[PendingTransactionsSubscription] { f.txs <- ev.Txs @@ -452,91 +393,6 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) for _, f := range filters[BlocksSubscription] { f.headers <- ev.Block.Header() } - if es.lightMode && len(filters[LogsSubscription]) > 0 { - es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) { - for _, f := range filters[LogsSubscription] { - if f.logsCrit.FromBlock != nil && header.Number.Cmp(f.logsCrit.FromBlock) < 0 { - continue - } - if f.logsCrit.ToBlock != nil && header.Number.Cmp(f.logsCrit.ToBlock) > 0 { - continue - } - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } - }) - } -} - -func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { - oldh := es.lastHead - es.lastHead = newHeader - if oldh == nil { - return - } - newh := newHeader - // find common ancestor, create list of rolled back and new block hashes - var oldHeaders, newHeaders []*types.Header - for oldh.Hash() != newh.Hash() { - if oldh.Number.Uint64() >= newh.Number.Uint64() { - oldHeaders = append(oldHeaders, oldh) - oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) - } - if oldh.Number.Uint64() < newh.Number.Uint64() { - newHeaders = append(newHeaders, newh) - newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) - if newh == nil { - // happens when CHT syncing, nothing to do - newh = oldh - } - } - } - // roll back old blocks - for _, h := range oldHeaders { - callBack(h, true) - } - // check new blocks (array is in reverse order) - for i := len(newHeaders) - 1; i >= 0; i-- { - callBack(newHeaders[i], false) - } -} - -// filter logs of a single header in light client mode -func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log { - if !bloomFilter(header.Bloom, addresses, topics) { - return nil - } - // Get the logs of the block - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64()) - if err != nil { - return nil - } - unfiltered := append([]*types.Log{}, cached.logs...) - for i, log := range unfiltered { - // Don't modify in-cache elements - logcopy := *log - logcopy.Removed = remove - // Swap copy in-place - unfiltered[i] = &logcopy - } - logs := filterLogs(unfiltered, nil, nil, addresses, topics) - // Txhash is already resolved - if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) { - return logs - } - // Resolve txhash - body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64()) - if err != nil { - return nil - } - for _, log := range logs { - // logs are already copied, safe to modify - log.TxHash = body.Transactions[log.TxIndex].Hash() - } - return logs } // eventLoop (un)installs filters and processes mux events. @@ -564,46 +420,13 @@ func (es *EventSystem) eventLoop() { es.handleLogs(index, ev.Logs) case ev := <-es.chainCh: es.handleChainEvent(index, ev) - // If we have no pending log subscription, - // we don't need to collect any pending logs. - if len(index[PendingLogsSubscription]) == 0 { - continue - } - - // Pull the pending logs if there is a new chain head. - pendingBlock, pendingReceipts, _ := es.backend.Pending() - if pendingBlock == nil || pendingReceipts == nil { - continue - } - if pendingBlock.ParentHash() != ev.Block.Hash() { - continue - } - var logs []*types.Log - for _, receipt := range pendingReceipts { - if len(receipt.Logs) > 0 { - logs = append(logs, receipt.Logs...) - } - } - es.handlePendingLogs(index, logs) case f := <-es.install: - if f.typ == MinedAndPendingLogsSubscription { - // the type are logs and pending logs subscriptions - index[LogsSubscription][f.id] = f - index[PendingLogsSubscription][f.id] = f - } else { - index[f.typ][f.id] = f - } + index[f.typ][f.id] = f close(f.installed) case f := <-es.uninstall: - if f.typ == MinedAndPendingLogsSubscription { - // the type are logs and pending logs subscriptions - delete(index[LogsSubscription], f.id) - delete(index[PendingLogsSubscription], f.id) - } else { - delete(index[f.typ], f.id) - } + delete(index[f.typ], f.id) close(f.err) // System stopped diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 4a0f40cce9..013b9f7bc2 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -19,7 +19,6 @@ package filters import ( "context" "errors" - "fmt" "math/big" "math/rand" "reflect" @@ -27,15 +26,12 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" @@ -125,10 +121,6 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint return logs, nil } -func (b *testBackend) Pending() (*types.Block, types.Receipts, *state.StateDB) { - return b.pendingBlock, b.pendingReceipts, nil -} - func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { return b.txFeed.Subscribe(ch) } @@ -181,15 +173,6 @@ func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) { b.pendingReceipts = receipts } -func (b *testBackend) notifyPending(logs []*types.Log) { - genesis := &core.Genesis{ - Config: params.TestChainConfig, - } - _, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 2, func(i int, b *core.BlockGen) {}) - b.setPending(blocks[1], []*types.Receipt{{Logs: logs}}) - b.chainFeed.Send(core.ChainEvent{Block: blocks[0]}) -} - func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) { backend := &testBackend{db: db} sys := NewFilterSystem(backend, cfg) @@ -207,7 +190,7 @@ func TestBlockSubscription(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) genesis = &core.Genesis{ Config: params.TestChainConfig, BaseFee: big.NewInt(params.InitialBaseFee), @@ -262,7 +245,7 @@ func TestPendingTxFilter(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -318,7 +301,7 @@ func TestPendingTxFilterFullTx(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -374,7 +357,7 @@ func TestLogFilterCreation(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) testCases = []struct { crit FilterCriteria @@ -386,8 +369,6 @@ func TestLogFilterCreation(t *testing.T) { {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true}, // "mined" block range to pending {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true}, - // new mined and pending blocks - {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true}, // from block "higher" than to block {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false}, // from block "higher" than to block @@ -423,7 +404,7 @@ func TestInvalidLogFilterCreation(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) ) // different situations where log filter creation should fail. @@ -449,7 +430,7 @@ func TestInvalidGetLogsRequest(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) @@ -475,7 +456,7 @@ func TestInvalidGetRangeLogsRequest(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) ) if _, err := api.GetLogs(context.Background(), FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}); err != errInvalidBlockRange { @@ -490,7 +471,7 @@ func TestLogFilter(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -509,9 +490,6 @@ func TestLogFilter(t *testing.T) { {Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}, } - expectedCase7 = []*types.Log{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]} - expectedCase11 = []*types.Log{allLogs[1], allLogs[2], allLogs[1], allLogs[2]} - testCases = []struct { crit FilterCriteria expected []*types.Log @@ -529,20 +507,14 @@ func TestLogFilter(t *testing.T) { 4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""}, // match logs based on multiple addresses and "or" topics 5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[2:5], ""}, - // logs in the pending block - 6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""}, - // mined logs with block num >= 2 or pending logs - 7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""}, // all "mined" logs with block num >= 2 - 8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""}, + 6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""}, // all "mined" logs - 9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""}, + 7: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""}, // all "mined" logs with 1>= block num <=2 and topic secondTopic - 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, - // all "mined" and pending logs with topic firstTopic - 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""}, + 8: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, // match all logs due to wildcard topic - 12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""}, + 9: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""}, } ) @@ -557,16 +529,13 @@ func TestLogFilter(t *testing.T) { t.Fatal("Logs event not delivered") } - // set pending logs - backend.notifyPending(allLogs) - for i, tt := range testCases { var fetched []*types.Log timeout := time.Now().Add(1 * time.Second) for { // fetch all expected logs results, err := api.GetFilterChanges(tt.id) if err != nil { - t.Fatalf("Unable to fetch logs: %v", err) + t.Fatalf("test %d: unable to fetch logs: %v", i, err) } fetched = append(fetched, results.([]*types.Log)...) @@ -597,326 +566,6 @@ func TestLogFilter(t *testing.T) { } } -// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed. -func TestPendingLogsSubscription(t *testing.T) { - t.Parallel() - - var ( - db = rawdb.NewMemoryDatabase() - backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) - - firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") - secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") - thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") - notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") - firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") - secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") - thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333") - fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") - notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") - - allLogs = [][]*types.Log{ - {{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}}, - {{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}}, - {{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}}, - {{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}}, - {{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}}, - { - {Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, - {Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5}, - {Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5}, - {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, - }, - } - - pendingBlockNumber = big.NewInt(rpc.PendingBlockNumber.Int64()) - - testCases = []struct { - crit ethereum.FilterQuery - expected []*types.Log - c chan []*types.Log - sub *Subscription - err chan error - }{ - // match all - { - ethereum.FilterQuery{FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - flattenLogs(allLogs), - nil, nil, nil, - }, - // match none due to no matching addresses - { - ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - nil, - nil, nil, nil, - }, - // match logs based on addresses, ignore topics - { - ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[:2]), allLogs[5][3]), - nil, nil, nil, - }, - // match none due to no matching topics (match with address) - { - ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - nil, - nil, nil, nil, - }, - // match logs based on addresses and topics - { - ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[3:5]), allLogs[5][0]), - nil, nil, nil, - }, - // match logs based on multiple addresses and "or" topics - { - ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[2:5]), allLogs[5][0]), - nil, nil, nil, - }, - // multiple pending logs, should match only 2 topics from the logs in block 5 - { - ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - []*types.Log{allLogs[5][0], allLogs[5][2]}, - nil, nil, nil, - }, - // match none due to only matching new mined logs - { - ethereum.FilterQuery{}, - nil, - nil, nil, nil, - }, - // match none due to only matching mined logs within a specific block range - { - ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, - nil, - nil, nil, nil, - }, - // match all due to matching mined and pending logs - { - ethereum.FilterQuery{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, - flattenLogs(allLogs), - nil, nil, nil, - }, - // match none due to matching logs from a specific block number to new mined blocks - { - ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, - nil, - nil, nil, nil, - }, - } - ) - - // create all subscriptions, this ensures all subscriptions are created before the events are posted. - // on slow machines this could otherwise lead to missing events when the subscription is created after - // (some) events are posted. - for i := range testCases { - testCases[i].c = make(chan []*types.Log) - testCases[i].err = make(chan error, 1) - - var err error - testCases[i].sub, err = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c) - if err != nil { - t.Fatalf("SubscribeLogs %d failed: %v\n", i, err) - } - } - - for n, test := range testCases { - i := n - tt := test - go func() { - defer tt.sub.Unsubscribe() - - var fetched []*types.Log - - timeout := time.After(1 * time.Second) - fetchLoop: - for { - select { - case logs := <-tt.c: - // Do not break early if we've fetched greater, or equal, - // to the number of logs expected. This ensures we do not - // deadlock the filter system because it will do a blocking - // send on this channel if another log arrives. - fetched = append(fetched, logs...) - case <-timeout: - break fetchLoop - } - } - - if len(fetched) != len(tt.expected) { - tt.err <- fmt.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) - return - } - - for l := range fetched { - if fetched[l].Removed { - tt.err <- fmt.Errorf("expected log not to be removed for log %d in case %d", l, i) - return - } - if !reflect.DeepEqual(fetched[l], tt.expected[l]) { - tt.err <- fmt.Errorf("invalid log on index %d for case %d\n", l, i) - return - } - } - tt.err <- nil - }() - } - - // set pending logs - var flattenLogs []*types.Log - for _, logs := range allLogs { - flattenLogs = append(flattenLogs, logs...) - } - backend.notifyPending(flattenLogs) - - for i := range testCases { - err := <-testCases[i].err - if err != nil { - t.Fatalf("test %d failed: %v", i, err) - } - <-testCases[i].sub.Err() - } -} - -func TestLightFilterLogs(t *testing.T) { - t.Parallel() - - var ( - db = rawdb.NewMemoryDatabase() - backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, true) - signer = types.HomesteadSigner{} - - firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") - secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") - thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") - notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") - firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") - secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") - - // posted twice, once as regular logs and once as pending logs. - allLogs = []*types.Log{ - // Block 1 - {Address: firstAddr, Topics: []common.Hash{}, Data: []byte{}, BlockNumber: 2, Index: 0}, - // Block 2 - {Address: firstAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 0}, - {Address: secondAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 1}, - {Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 3, Index: 2}, - // Block 3 - {Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 4, Index: 0}, - } - - testCases = []struct { - crit FilterCriteria - expected []*types.Log - id rpc.ID - }{ - // match all - 0: {FilterCriteria{}, allLogs, ""}, - // match none due to no matching addresses - 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""}, - // match logs based on addresses, ignore topics - 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, - // match logs based on addresses and topics - 3: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""}, - // all logs with block num >= 3 - 4: {FilterCriteria{FromBlock: big.NewInt(3), ToBlock: big.NewInt(5)}, allLogs[1:], ""}, - // all logs - 5: {FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(5)}, allLogs, ""}, - // all logs with 1>= block num <=2 and topic secondTopic - 6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(3), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, - } - - key, _ = crypto.GenerateKey() - addr = crypto.PubkeyToAddress(key.PublicKey) - genesis = &core.Genesis{Config: params.TestChainConfig, - Alloc: types.GenesisAlloc{ - addr: {Balance: big.NewInt(params.Ether)}, - }, - } - receipts = []*types.Receipt{{ - Logs: []*types.Log{allLogs[0]}, - }, { - Logs: []*types.Log{allLogs[1], allLogs[2], allLogs[3]}, - }, { - Logs: []*types.Log{allLogs[4]}, - }} - ) - - _, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 4, func(i int, b *core.BlockGen) { - if i == 0 { - return - } - receipts[i-1].Bloom = types.CreateBloom(types.Receipts{receipts[i-1]}) - b.AddUncheckedReceipt(receipts[i-1]) - tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{Nonce: uint64(i - 1), To: &common.Address{}, Value: big.NewInt(1000), Gas: params.TxGas, GasPrice: b.BaseFee(), Data: nil}), signer, key) - b.AddTx(tx) - }) - for i, block := range blocks { - rawdb.WriteBlock(db, block) - rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) - rawdb.WriteHeadBlockHash(db, block.Hash()) - if i > 0 { - rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), []*types.Receipt{receipts[i-1]}) - } - } - // create all filters - for i := range testCases { - id, err := api.NewFilter(testCases[i].crit) - if err != nil { - t.Fatal(err) - } - testCases[i].id = id - } - - // raise events - time.Sleep(1 * time.Second) - for _, block := range blocks { - backend.chainFeed.Send(core.ChainEvent{Block: block, Hash: common.Hash{}, Logs: allLogs}) - } - - for i, tt := range testCases { - var fetched []*types.Log - timeout := time.Now().Add(1 * time.Second) - for { // fetch all expected logs - results, err := api.GetFilterChanges(tt.id) - if err != nil { - t.Fatalf("Unable to fetch logs: %v", err) - } - fetched = append(fetched, results.([]*types.Log)...) - if len(fetched) >= len(tt.expected) { - break - } - // check timeout - if time.Now().After(timeout) { - break - } - - time.Sleep(100 * time.Millisecond) - } - - if len(fetched) != len(tt.expected) { - t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) - return - } - - for l := range fetched { - if fetched[l].Removed { - t.Errorf("expected log not to be removed for log %d in case %d", l, i) - } - expected := *tt.expected[l] - blockNum := expected.BlockNumber - 1 - expected.BlockHash = blocks[blockNum].Hash() - expected.TxHash = blocks[blockNum].Transactions()[0].Hash() - if !reflect.DeepEqual(fetched[l], &expected) { - t.Errorf("invalid log on index %d for case %d", l, i) - } - } - } -} - // TestPendingTxFilterDeadlock tests if the event loop hangs when pending // txes arrive at the same time that one of multiple filters is timing out. // Please refer to #22131 for more details. @@ -927,7 +576,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) done = make(chan struct{}) ) @@ -979,11 +628,3 @@ func TestPendingTxFilterDeadlock(t *testing.T) { } } } - -func flattenLogs(pl [][]*types.Log) []*types.Log { - var logs []*types.Log - for _, l := range pl { - logs = append(logs, l...) - } - return logs -} diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 48aaa584db..2b3efb51b1 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -344,16 +344,16 @@ func TestFilters(t *testing.T) { err: "safe header not found", }, { - f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), - want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x4110587c1b8d86edc85dce929a34127f1cb8809515a9f177c91c866de3eb0638","transactionIndex":"0x0","blockHash":"0xd5e8d4e4eb51a2a2a6ec20ef68a4c2801240743c8deb77a6a1d118ac3eefb725","logIndex":"0x0","removed":false}]`, + f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), + err: errPendingLogsUnsupported.Error(), }, { - f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), - want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x9a87842100a638dfa5da8842b4beda691d2fd77b0c84b57f24ecfa9fb208f747","transactionIndex":"0x0","blockHash":"0xb360bad5265261c075ece02d3bf0e39498a6a76310482cdfd90588748e6c5ee0","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x4110587c1b8d86edc85dce929a34127f1cb8809515a9f177c91c866de3eb0638","transactionIndex":"0x0","blockHash":"0xd5e8d4e4eb51a2a2a6ec20ef68a4c2801240743c8deb77a6a1d118ac3eefb725","logIndex":"0x0","removed":false}]`, + f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil), + err: errPendingLogsUnsupported.Error(), }, { f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.LatestBlockNumber), nil, nil), - err: errInvalidBlockRange.Error(), + err: errPendingLogsUnsupported.Error(), }, } { logs, err := tc.f.Logs(context.Background()) @@ -375,7 +375,7 @@ func TestFilters(t *testing.T) { } t.Run("timeout", func(t *testing.T) { - f := sys.NewRangeFilter(0, -1, nil, nil) + f := sys.NewRangeFilter(0, rpc.LatestBlockNumber.Int64(), nil, nil) ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour)) defer cancel() _, err := f.Logs(ctx) diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index 1cd9c5389b..59ad370146 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -65,7 +65,7 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) { filterSystem := filters.NewFilterSystem(ethservice.APIBackend, filters.Config{}) n.RegisterAPIs([]rpc.API{{ Namespace: "eth", - Service: filters.NewFilterAPI(filterSystem, false), + Service: filters.NewFilterAPI(filterSystem), }}) // Import the test chain. diff --git a/ethclient/simulated/backend.go b/ethclient/simulated/backend.go index d6fb886ad8..6e07aa68d0 100644 --- a/ethclient/simulated/backend.go +++ b/ethclient/simulated/backend.go @@ -114,7 +114,7 @@ func newWithNode(stack *node.Node, conf *eth.Config, blockPeriod uint64) (*Backe filterSystem := filters.NewFilterSystem(backend.APIBackend, filters.Config{}) stack.RegisterAPIs([]rpc.API{{ Namespace: "eth", - Service: filters.NewFilterAPI(filterSystem, false), + Service: filters.NewFilterAPI(filterSystem), }}) // Start the node if err := stack.Start(); err != nil {