eth/filters: add global block logs cache (#25459)
This adds a cache for block logs which is shared by all filters. The cache size of is configurable using the `--cache.blocklogs` flag. Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
parent
77308cd6fc
commit
36874b63a1
@ -68,7 +68,8 @@ type SimulatedBackend struct {
|
|||||||
pendingState *state.StateDB // Currently pending state that will be the active on request
|
pendingState *state.StateDB // Currently pending state that will be the active on request
|
||||||
pendingReceipts types.Receipts // Currently receipts for the pending block
|
pendingReceipts types.Receipts // Currently receipts for the pending block
|
||||||
|
|
||||||
events *filters.EventSystem // Event system for filtering log events live
|
events *filters.EventSystem // for filtering log events live
|
||||||
|
filterSystem *filters.FilterSystem // for filtering database logs
|
||||||
|
|
||||||
config *params.ChainConfig
|
config *params.ChainConfig
|
||||||
}
|
}
|
||||||
@ -86,7 +87,11 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis
|
|||||||
blockchain: blockchain,
|
blockchain: blockchain,
|
||||||
config: genesis.Config,
|
config: genesis.Config,
|
||||||
}
|
}
|
||||||
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)
|
|
||||||
|
filterBackend := &filterBackend{database, blockchain, backend}
|
||||||
|
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
|
||||||
|
backend.events = filters.NewEventSystem(backend.filterSystem, false)
|
||||||
|
|
||||||
backend.rollback(blockchain.CurrentBlock())
|
backend.rollback(blockchain.CurrentBlock())
|
||||||
return backend
|
return backend
|
||||||
}
|
}
|
||||||
@ -689,7 +694,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
|
|||||||
var filter *filters.Filter
|
var filter *filters.Filter
|
||||||
if query.BlockHash != nil {
|
if query.BlockHash != nil {
|
||||||
// Block filter requested, construct a single-shot filter
|
// Block filter requested, construct a single-shot filter
|
||||||
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
|
filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics)
|
||||||
} else {
|
} else {
|
||||||
// Initialize unset filter boundaries to run from genesis to chain head
|
// Initialize unset filter boundaries to run from genesis to chain head
|
||||||
from := int64(0)
|
from := int64(0)
|
||||||
@ -701,7 +706,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
|
|||||||
to = query.ToBlock.Int64()
|
to = query.ToBlock.Int64()
|
||||||
}
|
}
|
||||||
// Construct the range filter
|
// Construct the range filter
|
||||||
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
|
filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics)
|
||||||
}
|
}
|
||||||
// Run the filter and return all the logs
|
// Run the filter and return all the logs
|
||||||
logs, err := filter.Logs(ctx)
|
logs, err := filter.Logs(ctx)
|
||||||
@ -827,7 +832,8 @@ type filterBackend struct {
|
|||||||
backend *SimulatedBackend
|
backend *SimulatedBackend
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
|
func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
|
||||||
|
|
||||||
func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") }
|
func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") }
|
||||||
|
|
||||||
func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumber) (*types.Header, error) {
|
func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumber) (*types.Header, error) {
|
||||||
@ -853,19 +859,8 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ
|
|||||||
return rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config()), nil
|
return rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
|
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
|
||||||
number := rawdb.ReadHeaderNumber(fb.db, hash)
|
logs := rawdb.ReadLogs(fb.db, hash, number, fb.bc.Config())
|
||||||
if number == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
receipts := rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config())
|
|
||||||
if receipts == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
logs := make([][]*types.Log, len(receipts))
|
|
||||||
for i, receipt := range receipts {
|
|
||||||
logs[i] = receipt.Logs
|
|
||||||
}
|
|
||||||
return logs, nil
|
return logs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,7 +163,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
|||||||
override := ctx.Bool(utils.OverrideTerminalTotalDifficultyPassed.Name)
|
override := ctx.Bool(utils.OverrideTerminalTotalDifficultyPassed.Name)
|
||||||
cfg.Eth.OverrideTerminalTotalDifficultyPassed = &override
|
cfg.Eth.OverrideTerminalTotalDifficultyPassed = &override
|
||||||
}
|
}
|
||||||
|
|
||||||
backend, eth := utils.RegisterEthService(stack, &cfg.Eth)
|
backend, eth := utils.RegisterEthService(stack, &cfg.Eth)
|
||||||
|
|
||||||
// Warn users to migrate if they have a legacy freezer format.
|
// Warn users to migrate if they have a legacy freezer format.
|
||||||
if eth != nil && !ctx.IsSet(utils.IgnoreLegacyReceiptsFlag.Name) {
|
if eth != nil && !ctx.IsSet(utils.IgnoreLegacyReceiptsFlag.Name) {
|
||||||
firstIdx := uint64(0)
|
firstIdx := uint64(0)
|
||||||
@ -181,10 +183,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
|||||||
utils.Fatalf("Database has receipts with a legacy format. Please run `geth db freezer-migrate`.")
|
utils.Fatalf("Database has receipts with a legacy format. Please run `geth db freezer-migrate`.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Configure GraphQL if requested
|
|
||||||
|
// Configure log filter RPC API.
|
||||||
|
filterSystem := utils.RegisterFilterAPI(stack, backend, &cfg.Eth)
|
||||||
|
|
||||||
|
// Configure GraphQL if requested.
|
||||||
if ctx.IsSet(utils.GraphQLEnabledFlag.Name) {
|
if ctx.IsSet(utils.GraphQLEnabledFlag.Name) {
|
||||||
utils.RegisterGraphQLService(stack, backend, cfg.Node)
|
utils.RegisterGraphQLService(stack, backend, filterSystem, &cfg.Node)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the Ethereum Stats daemon if requested.
|
// Add the Ethereum Stats daemon if requested.
|
||||||
if cfg.Ethstats.URL != "" {
|
if cfg.Ethstats.URL != "" {
|
||||||
utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL)
|
utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL)
|
||||||
|
@ -117,6 +117,7 @@ var (
|
|||||||
utils.CacheSnapshotFlag,
|
utils.CacheSnapshotFlag,
|
||||||
utils.CacheNoPrefetchFlag,
|
utils.CacheNoPrefetchFlag,
|
||||||
utils.CachePreimagesFlag,
|
utils.CachePreimagesFlag,
|
||||||
|
utils.CacheLogSizeFlag,
|
||||||
utils.FDLimitFlag,
|
utils.FDLimitFlag,
|
||||||
utils.ListenPortFlag,
|
utils.ListenPortFlag,
|
||||||
utils.DiscoveryPortFlag,
|
utils.DiscoveryPortFlag,
|
||||||
|
@ -43,6 +43,7 @@ import (
|
|||||||
ethcatalyst "github.com/ethereum/go-ethereum/eth/catalyst"
|
ethcatalyst "github.com/ethereum/go-ethereum/eth/catalyst"
|
||||||
"github.com/ethereum/go-ethereum/eth/downloader"
|
"github.com/ethereum/go-ethereum/eth/downloader"
|
||||||
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
||||||
|
"github.com/ethereum/go-ethereum/eth/filters"
|
||||||
"github.com/ethereum/go-ethereum/eth/gasprice"
|
"github.com/ethereum/go-ethereum/eth/gasprice"
|
||||||
"github.com/ethereum/go-ethereum/eth/tracers"
|
"github.com/ethereum/go-ethereum/eth/tracers"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
@ -64,6 +65,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/p2p/nat"
|
"github.com/ethereum/go-ethereum/p2p/nat"
|
||||||
"github.com/ethereum/go-ethereum/p2p/netutil"
|
"github.com/ethereum/go-ethereum/p2p/netutil"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
pcsclite "github.com/gballet/go-libpcsclite"
|
pcsclite "github.com/gballet/go-libpcsclite"
|
||||||
gopsutil "github.com/shirou/gopsutil/mem"
|
gopsutil "github.com/shirou/gopsutil/mem"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
@ -491,6 +493,12 @@ var (
|
|||||||
Usage: "Enable recording the SHA3/keccak preimages of trie keys",
|
Usage: "Enable recording the SHA3/keccak preimages of trie keys",
|
||||||
Category: flags.PerfCategory,
|
Category: flags.PerfCategory,
|
||||||
}
|
}
|
||||||
|
CacheLogSizeFlag = &cli.IntFlag{
|
||||||
|
Name: "cache.blocklogs",
|
||||||
|
Usage: "Size (in number of blocks) of the log cache for filtering",
|
||||||
|
Category: flags.PerfCategory,
|
||||||
|
Value: ethconfig.Defaults.FilterLogCacheSize,
|
||||||
|
}
|
||||||
FDLimitFlag = &cli.IntFlag{
|
FDLimitFlag = &cli.IntFlag{
|
||||||
Name: "fdlimit",
|
Name: "fdlimit",
|
||||||
Usage: "Raise the open file descriptor resource limit (default = system fd limit)",
|
Usage: "Raise the open file descriptor resource limit (default = system fd limit)",
|
||||||
@ -1808,6 +1816,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
|
|||||||
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheSnapshotFlag.Name) {
|
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheSnapshotFlag.Name) {
|
||||||
cfg.SnapshotCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheSnapshotFlag.Name) / 100
|
cfg.SnapshotCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheSnapshotFlag.Name) / 100
|
||||||
}
|
}
|
||||||
|
if ctx.IsSet(CacheLogSizeFlag.Name) {
|
||||||
|
cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name)
|
||||||
|
}
|
||||||
if !ctx.Bool(SnapshotFlag.Name) {
|
if !ctx.Bool(SnapshotFlag.Name) {
|
||||||
// If snap-sync is requested, this flag is also required
|
// If snap-sync is requested, this flag is also required
|
||||||
if cfg.SyncMode == downloader.SnapSync {
|
if cfg.SyncMode == downloader.SnapSync {
|
||||||
@ -2005,21 +2016,34 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config) (ethapi.Backend
|
|||||||
return backend.APIBackend, backend
|
return backend.APIBackend, backend
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to
|
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to the node.
|
||||||
// the given node.
|
|
||||||
func RegisterEthStatsService(stack *node.Node, backend ethapi.Backend, url string) {
|
func RegisterEthStatsService(stack *node.Node, backend ethapi.Backend, url string) {
|
||||||
if err := ethstats.New(stack, backend, backend.Engine(), url); err != nil {
|
if err := ethstats.New(stack, backend, backend.Engine(), url); err != nil {
|
||||||
Fatalf("Failed to register the Ethereum Stats service: %v", err)
|
Fatalf("Failed to register the Ethereum Stats service: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterGraphQLService is a utility function to construct a new service and register it against a node.
|
// RegisterGraphQLService adds the GraphQL API to the node.
|
||||||
func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.Config) {
|
func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cfg *node.Config) {
|
||||||
if err := graphql.New(stack, backend, cfg.GraphQLCors, cfg.GraphQLVirtualHosts); err != nil {
|
err := graphql.New(stack, backend, filterSystem, cfg.GraphQLCors, cfg.GraphQLVirtualHosts)
|
||||||
|
if err != nil {
|
||||||
Fatalf("Failed to register the GraphQL service: %v", err)
|
Fatalf("Failed to register the GraphQL service: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterFilterAPI adds the eth log filtering RPC API to the node.
|
||||||
|
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
|
||||||
|
isLightClient := ethcfg.SyncMode == downloader.LightSync
|
||||||
|
filterSystem := filters.NewFilterSystem(backend, filters.Config{
|
||||||
|
LogCacheSize: ethcfg.FilterLogCacheSize,
|
||||||
|
})
|
||||||
|
stack.RegisterAPIs([]rpc.API{{
|
||||||
|
Namespace: "eth",
|
||||||
|
Service: filters.NewFilterAPI(filterSystem, isLightClient),
|
||||||
|
}})
|
||||||
|
return filterSystem
|
||||||
|
}
|
||||||
|
|
||||||
func SetupMetrics(ctx *cli.Context) {
|
func SetupMetrics(ctx *cli.Context) {
|
||||||
if metrics.Enabled {
|
if metrics.Enabled {
|
||||||
log.Info("Enabling metrics collection")
|
log.Info("Enabling metrics collection")
|
||||||
|
@ -19,7 +19,6 @@ package eth
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -202,17 +201,8 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type
|
|||||||
return b.eth.blockchain.GetReceiptsByHash(hash), nil
|
return b.eth.blockchain.GetReceiptsByHash(hash), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
|
func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
|
||||||
db := b.eth.ChainDb()
|
return rawdb.ReadLogs(b.eth.chainDb, hash, number, b.ChainConfig()), nil
|
||||||
number := rawdb.ReadHeaderNumber(db, hash)
|
|
||||||
if number == nil {
|
|
||||||
return nil, fmt.Errorf("failed to get block number for hash %#x", hash)
|
|
||||||
}
|
|
||||||
logs := rawdb.ReadLogs(db, hash, *number, b.eth.blockchain.Config())
|
|
||||||
if logs == nil {
|
|
||||||
return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", *number, hash.TerminalString())
|
|
||||||
}
|
|
||||||
return logs, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {
|
func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {
|
||||||
|
@ -25,7 +25,6 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/accounts"
|
"github.com/ethereum/go-ethereum/accounts"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
@ -41,7 +40,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/vm"
|
"github.com/ethereum/go-ethereum/core/vm"
|
||||||
"github.com/ethereum/go-ethereum/eth/downloader"
|
"github.com/ethereum/go-ethereum/eth/downloader"
|
||||||
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
||||||
"github.com/ethereum/go-ethereum/eth/filters"
|
|
||||||
"github.com/ethereum/go-ethereum/eth/gasprice"
|
"github.com/ethereum/go-ethereum/eth/gasprice"
|
||||||
"github.com/ethereum/go-ethereum/eth/protocols/eth"
|
"github.com/ethereum/go-ethereum/eth/protocols/eth"
|
||||||
"github.com/ethereum/go-ethereum/eth/protocols/snap"
|
"github.com/ethereum/go-ethereum/eth/protocols/snap"
|
||||||
@ -315,9 +313,6 @@ func (s *Ethereum) APIs() []rpc.API {
|
|||||||
}, {
|
}, {
|
||||||
Namespace: "eth",
|
Namespace: "eth",
|
||||||
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux),
|
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux),
|
||||||
}, {
|
|
||||||
Namespace: "eth",
|
|
||||||
Service: filters.NewFilterAPI(s.APIBackend, false, 5*time.Minute),
|
|
||||||
}, {
|
}, {
|
||||||
Namespace: "admin",
|
Namespace: "admin",
|
||||||
Service: NewAdminAPI(s),
|
Service: NewAdminAPI(s),
|
||||||
|
@ -83,6 +83,7 @@ var Defaults = Config{
|
|||||||
TrieDirtyCache: 256,
|
TrieDirtyCache: 256,
|
||||||
TrieTimeout: 60 * time.Minute,
|
TrieTimeout: 60 * time.Minute,
|
||||||
SnapshotCache: 102,
|
SnapshotCache: 102,
|
||||||
|
FilterLogCacheSize: 32,
|
||||||
Miner: miner.Config{
|
Miner: miner.Config{
|
||||||
GasCeil: 30000000,
|
GasCeil: 30000000,
|
||||||
GasPrice: big.NewInt(params.GWei),
|
GasPrice: big.NewInt(params.GWei),
|
||||||
@ -171,6 +172,9 @@ type Config struct {
|
|||||||
SnapshotCache int
|
SnapshotCache int
|
||||||
Preimages bool
|
Preimages bool
|
||||||
|
|
||||||
|
// This is the number of blocks for which logs will be cached in the filter system.
|
||||||
|
FilterLogCacheSize int
|
||||||
|
|
||||||
// Mining options
|
// Mining options
|
||||||
Miner miner.Config
|
Miner miner.Config
|
||||||
|
|
||||||
|
@ -48,6 +48,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
|
|||||||
TrieTimeout time.Duration
|
TrieTimeout time.Duration
|
||||||
SnapshotCache int
|
SnapshotCache int
|
||||||
Preimages bool
|
Preimages bool
|
||||||
|
FilterLogCacheSize int
|
||||||
Miner miner.Config
|
Miner miner.Config
|
||||||
Ethash ethash.Config
|
Ethash ethash.Config
|
||||||
TxPool core.TxPoolConfig
|
TxPool core.TxPoolConfig
|
||||||
@ -93,6 +94,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
|
|||||||
enc.TrieTimeout = c.TrieTimeout
|
enc.TrieTimeout = c.TrieTimeout
|
||||||
enc.SnapshotCache = c.SnapshotCache
|
enc.SnapshotCache = c.SnapshotCache
|
||||||
enc.Preimages = c.Preimages
|
enc.Preimages = c.Preimages
|
||||||
|
enc.FilterLogCacheSize = c.FilterLogCacheSize
|
||||||
enc.Miner = c.Miner
|
enc.Miner = c.Miner
|
||||||
enc.Ethash = c.Ethash
|
enc.Ethash = c.Ethash
|
||||||
enc.TxPool = c.TxPool
|
enc.TxPool = c.TxPool
|
||||||
@ -142,6 +144,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
|
|||||||
TrieTimeout *time.Duration
|
TrieTimeout *time.Duration
|
||||||
SnapshotCache *int
|
SnapshotCache *int
|
||||||
Preimages *bool
|
Preimages *bool
|
||||||
|
FilterLogCacheSize *int
|
||||||
Miner *miner.Config
|
Miner *miner.Config
|
||||||
Ethash *ethash.Config
|
Ethash *ethash.Config
|
||||||
TxPool *core.TxPoolConfig
|
TxPool *core.TxPoolConfig
|
||||||
@ -250,6 +253,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
|
|||||||
if dec.Preimages != nil {
|
if dec.Preimages != nil {
|
||||||
c.Preimages = *dec.Preimages
|
c.Preimages = *dec.Preimages
|
||||||
}
|
}
|
||||||
|
if dec.FilterLogCacheSize != nil {
|
||||||
|
c.FilterLogCacheSize = *dec.FilterLogCacheSize
|
||||||
|
}
|
||||||
if dec.Miner != nil {
|
if dec.Miner != nil {
|
||||||
c.Miner = *dec.Miner
|
c.Miner = *dec.Miner
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,7 @@ type filter struct {
|
|||||||
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
|
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
|
||||||
// information related to the Ethereum protocol such als blocks, transactions and logs.
|
// information related to the Ethereum protocol such als blocks, transactions and logs.
|
||||||
type FilterAPI struct {
|
type FilterAPI struct {
|
||||||
backend Backend
|
sys *FilterSystem
|
||||||
events *EventSystem
|
events *EventSystem
|
||||||
filtersMu sync.Mutex
|
filtersMu sync.Mutex
|
||||||
filters map[rpc.ID]*filter
|
filters map[rpc.ID]*filter
|
||||||
@ -54,14 +54,14 @@ type FilterAPI struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewFilterAPI returns a new FilterAPI instance.
|
// NewFilterAPI returns a new FilterAPI instance.
|
||||||
func NewFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *FilterAPI {
|
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
|
||||||
api := &FilterAPI{
|
api := &FilterAPI{
|
||||||
backend: backend,
|
sys: system,
|
||||||
events: NewEventSystem(backend, lightMode),
|
events: NewEventSystem(system, lightMode),
|
||||||
filters: make(map[rpc.ID]*filter),
|
filters: make(map[rpc.ID]*filter),
|
||||||
timeout: timeout,
|
timeout: system.cfg.Timeout,
|
||||||
}
|
}
|
||||||
go api.timeoutLoop(timeout)
|
go api.timeoutLoop(system.cfg.Timeout)
|
||||||
|
|
||||||
return api
|
return api
|
||||||
}
|
}
|
||||||
@ -320,7 +320,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
|
|||||||
var filter *Filter
|
var filter *Filter
|
||||||
if crit.BlockHash != nil {
|
if crit.BlockHash != nil {
|
||||||
// Block filter requested, construct a single-shot filter
|
// Block filter requested, construct a single-shot filter
|
||||||
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
|
filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics)
|
||||||
} else {
|
} else {
|
||||||
// Convert the RPC block numbers into internal representations
|
// Convert the RPC block numbers into internal representations
|
||||||
begin := rpc.LatestBlockNumber.Int64()
|
begin := rpc.LatestBlockNumber.Int64()
|
||||||
@ -332,7 +332,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
|
|||||||
end = crit.ToBlock.Int64()
|
end = crit.ToBlock.Int64()
|
||||||
}
|
}
|
||||||
// Construct the range filter
|
// Construct the range filter
|
||||||
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
|
filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics)
|
||||||
}
|
}
|
||||||
// Run the filter and return all the logs
|
// Run the filter and return all the logs
|
||||||
logs, err := filter.Logs(ctx)
|
logs, err := filter.Logs(ctx)
|
||||||
@ -371,7 +371,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
|
|||||||
var filter *Filter
|
var filter *Filter
|
||||||
if f.crit.BlockHash != nil {
|
if f.crit.BlockHash != nil {
|
||||||
// Block filter requested, construct a single-shot filter
|
// Block filter requested, construct a single-shot filter
|
||||||
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
|
filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
|
||||||
} else {
|
} else {
|
||||||
// Convert the RPC block numbers into internal representations
|
// Convert the RPC block numbers into internal representations
|
||||||
begin := rpc.LatestBlockNumber.Int64()
|
begin := rpc.LatestBlockNumber.Int64()
|
||||||
@ -383,7 +383,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
|
|||||||
end = f.crit.ToBlock.Int64()
|
end = f.crit.ToBlock.Int64()
|
||||||
}
|
}
|
||||||
// Construct the range filter
|
// Construct the range filter
|
||||||
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
|
filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics)
|
||||||
}
|
}
|
||||||
// Run the filter and return all the logs
|
// Run the filter and return all the logs
|
||||||
logs, err := filter.Logs(ctx)
|
logs, err := filter.Logs(ctx)
|
||||||
|
@ -122,22 +122,27 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
|
|||||||
|
|
||||||
b.Log("Running filter benchmarks...")
|
b.Log("Running filter benchmarks...")
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
var backend *testBackend
|
|
||||||
|
|
||||||
|
var (
|
||||||
|
backend *testBackend
|
||||||
|
sys *FilterSystem
|
||||||
|
)
|
||||||
for i := 0; i < benchFilterCnt; i++ {
|
for i := 0; i < benchFilterCnt; i++ {
|
||||||
if i%20 == 0 {
|
if i%20 == 0 {
|
||||||
db.Close()
|
db.Close()
|
||||||
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "", false)
|
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "", false)
|
||||||
backend = &testBackend{db: db, sections: cnt}
|
backend = &testBackend{db: db, sections: cnt}
|
||||||
|
sys = NewFilterSystem(backend, Config{})
|
||||||
}
|
}
|
||||||
var addr common.Address
|
var addr common.Address
|
||||||
addr[0] = byte(i)
|
addr[0] = byte(i)
|
||||||
addr[1] = byte(i / 256)
|
addr[1] = byte(i / 256)
|
||||||
filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
|
filter := sys.NewRangeFilter(0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
|
||||||
if _, err := filter.Logs(context.Background()); err != nil {
|
if _, err := filter.Logs(context.Background()); err != nil {
|
||||||
b.Error("filter.Find error:", err)
|
b.Error("filter.Logs error:", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
d = time.Since(start)
|
d = time.Since(start)
|
||||||
b.Log("Finished running filter benchmarks")
|
b.Log("Finished running filter benchmarks")
|
||||||
b.Log(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks")
|
b.Log(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks")
|
||||||
@ -171,10 +176,11 @@ func BenchmarkNoBloomBits(b *testing.B) {
|
|||||||
|
|
||||||
clearBloomBits(db)
|
clearBloomBits(db)
|
||||||
|
|
||||||
|
_, sys := newTestFilterSystem(b, db, Config{})
|
||||||
|
|
||||||
b.Log("Running filter benchmarks...")
|
b.Log("Running filter benchmarks...")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
backend := &testBackend{db: db}
|
filter := sys.NewRangeFilter(0, int64(*headNum), []common.Address{{}}, nil)
|
||||||
filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil)
|
|
||||||
filter.Logs(context.Background())
|
filter.Logs(context.Background())
|
||||||
d := time.Since(start)
|
d := time.Since(start)
|
||||||
b.Log("Finished running filter benchmarks")
|
b.Log("Finished running filter benchmarks")
|
||||||
|
@ -22,37 +22,15 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
|
||||||
"github.com/ethereum/go-ethereum/core/bloombits"
|
"github.com/ethereum/go-ethereum/core/bloombits"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
|
||||||
"github.com/ethereum/go-ethereum/event"
|
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Backend interface {
|
|
||||||
ChainDb() ethdb.Database
|
|
||||||
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
|
|
||||||
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
|
|
||||||
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
|
|
||||||
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
|
|
||||||
PendingBlockAndReceipts() (*types.Block, types.Receipts)
|
|
||||||
|
|
||||||
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
|
|
||||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
|
||||||
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
|
||||||
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
|
||||||
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
|
|
||||||
|
|
||||||
BloomStatus() (uint64, uint64)
|
|
||||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter can be used to retrieve and filter logs.
|
// Filter can be used to retrieve and filter logs.
|
||||||
type Filter struct {
|
type Filter struct {
|
||||||
backend Backend
|
sys *FilterSystem
|
||||||
|
|
||||||
db ethdb.Database
|
|
||||||
addresses []common.Address
|
addresses []common.Address
|
||||||
topics [][]common.Hash
|
topics [][]common.Hash
|
||||||
|
|
||||||
@ -64,7 +42,7 @@ type Filter struct {
|
|||||||
|
|
||||||
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
|
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
|
||||||
// figure out whether a particular block is interesting or not.
|
// figure out whether a particular block is interesting or not.
|
||||||
func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
|
func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||||
// Flatten the address and topic filter clauses into a single bloombits filter
|
// Flatten the address and topic filter clauses into a single bloombits filter
|
||||||
// system. Since the bloombits are not positional, nil topics are permitted,
|
// system. Since the bloombits are not positional, nil topics are permitted,
|
||||||
// which get flattened into a nil byte slice.
|
// which get flattened into a nil byte slice.
|
||||||
@ -83,10 +61,10 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres
|
|||||||
}
|
}
|
||||||
filters = append(filters, filter)
|
filters = append(filters, filter)
|
||||||
}
|
}
|
||||||
size, _ := backend.BloomStatus()
|
size, _ := sys.backend.BloomStatus()
|
||||||
|
|
||||||
// Create a generic filter and convert it into a range filter
|
// Create a generic filter and convert it into a range filter
|
||||||
filter := newFilter(backend, addresses, topics)
|
filter := newFilter(sys, addresses, topics)
|
||||||
|
|
||||||
filter.matcher = bloombits.NewMatcher(size, filters)
|
filter.matcher = bloombits.NewMatcher(size, filters)
|
||||||
filter.begin = begin
|
filter.begin = begin
|
||||||
@ -97,21 +75,20 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres
|
|||||||
|
|
||||||
// NewBlockFilter creates a new filter which directly inspects the contents of
|
// NewBlockFilter creates a new filter which directly inspects the contents of
|
||||||
// a block to figure out whether it is interesting or not.
|
// a block to figure out whether it is interesting or not.
|
||||||
func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
|
func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||||
// Create a generic filter and convert it into a block filter
|
// Create a generic filter and convert it into a block filter
|
||||||
filter := newFilter(backend, addresses, topics)
|
filter := newFilter(sys, addresses, topics)
|
||||||
filter.block = block
|
filter.block = block
|
||||||
return filter
|
return filter
|
||||||
}
|
}
|
||||||
|
|
||||||
// newFilter creates a generic filter that can either filter based on a block hash,
|
// newFilter creates a generic filter that can either filter based on a block hash,
|
||||||
// or based on range queries. The search criteria needs to be explicitly set.
|
// or based on range queries. The search criteria needs to be explicitly set.
|
||||||
func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter {
|
func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||||
return &Filter{
|
return &Filter{
|
||||||
backend: backend,
|
sys: sys,
|
||||||
addresses: addresses,
|
addresses: addresses,
|
||||||
topics: topics,
|
topics: topics,
|
||||||
db: backend.ChainDb(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,14 +97,14 @@ func newFilter(backend Backend, addresses []common.Address, topics [][]common.Ha
|
|||||||
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
||||||
// If we're doing singleton block filtering, execute and return
|
// If we're doing singleton block filtering, execute and return
|
||||||
if f.block != (common.Hash{}) {
|
if f.block != (common.Hash{}) {
|
||||||
header, err := f.backend.HeaderByHash(ctx, f.block)
|
header, err := f.sys.backend.HeaderByHash(ctx, f.block)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if header == nil {
|
if header == nil {
|
||||||
return nil, errors.New("unknown block")
|
return nil, errors.New("unknown block")
|
||||||
}
|
}
|
||||||
return f.blockLogs(ctx, header)
|
return f.blockLogs(ctx, header, false)
|
||||||
}
|
}
|
||||||
// Short-cut if all we care about is pending logs
|
// Short-cut if all we care about is pending logs
|
||||||
if f.begin == rpc.PendingBlockNumber.Int64() {
|
if f.begin == rpc.PendingBlockNumber.Int64() {
|
||||||
@ -137,7 +114,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
|||||||
return f.pendingLogs()
|
return f.pendingLogs()
|
||||||
}
|
}
|
||||||
// Figure out the limits of the filter range
|
// Figure out the limits of the filter range
|
||||||
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
|
header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
|
||||||
if header == nil {
|
if header == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -156,7 +133,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
|||||||
var (
|
var (
|
||||||
logs []*types.Log
|
logs []*types.Log
|
||||||
err error
|
err error
|
||||||
size, sections = f.backend.BloomStatus()
|
size, sections = f.sys.backend.BloomStatus()
|
||||||
)
|
)
|
||||||
if indexed := sections * size; indexed > uint64(f.begin) {
|
if indexed := sections * size; indexed > uint64(f.begin) {
|
||||||
if indexed > end {
|
if indexed > end {
|
||||||
@ -192,7 +169,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
|
|||||||
}
|
}
|
||||||
defer session.Close()
|
defer session.Close()
|
||||||
|
|
||||||
f.backend.ServiceFilter(ctx, session)
|
f.sys.backend.ServiceFilter(ctx, session)
|
||||||
|
|
||||||
// Iterate over the matches until exhausted or context closed
|
// Iterate over the matches until exhausted or context closed
|
||||||
var logs []*types.Log
|
var logs []*types.Log
|
||||||
@ -211,11 +188,11 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
|
|||||||
f.begin = int64(number) + 1
|
f.begin = int64(number) + 1
|
||||||
|
|
||||||
// Retrieve the suggested block and pull any truly matching logs
|
// Retrieve the suggested block and pull any truly matching logs
|
||||||
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
|
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
|
||||||
if header == nil || err != nil {
|
if header == nil || err != nil {
|
||||||
return logs, err
|
return logs, err
|
||||||
}
|
}
|
||||||
found, err := f.checkMatches(ctx, header)
|
found, err := f.blockLogs(ctx, header, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return logs, err
|
return logs, err
|
||||||
}
|
}
|
||||||
@ -233,11 +210,11 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
|
|||||||
var logs []*types.Log
|
var logs []*types.Log
|
||||||
|
|
||||||
for ; f.begin <= int64(end); f.begin++ {
|
for ; f.begin <= int64(end); f.begin++ {
|
||||||
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
|
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
|
||||||
if header == nil || err != nil {
|
if header == nil || err != nil {
|
||||||
return logs, err
|
return logs, err
|
||||||
}
|
}
|
||||||
found, err := f.blockLogs(ctx, header)
|
found, err := f.blockLogs(ctx, header, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return logs, err
|
return logs, err
|
||||||
}
|
}
|
||||||
@ -247,34 +224,34 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
// blockLogs returns the logs matching the filter criteria within a single block.
|
// blockLogs returns the logs matching the filter criteria within a single block.
|
||||||
func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
|
func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) {
|
||||||
if bloomFilter(header.Bloom, f.addresses, f.topics) {
|
// Fast track: no filtering criteria
|
||||||
found, err := f.checkMatches(ctx, header)
|
if len(f.addresses) == 0 && len(f.topics) == 0 {
|
||||||
|
list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return logs, err
|
return nil, err
|
||||||
}
|
}
|
||||||
logs = append(logs, found...)
|
return flatten(list), nil
|
||||||
|
} else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) {
|
||||||
|
return f.checkMatches(ctx, header)
|
||||||
}
|
}
|
||||||
return logs, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkMatches checks if the receipts belonging to the given header contain any log events that
|
// checkMatches checks if the receipts belonging to the given header contain any log events that
|
||||||
// match the filter criteria. This function is called when the bloom filter signals a potential match.
|
// match the filter criteria. This function is called when the bloom filter signals a potential match.
|
||||||
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
|
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) {
|
||||||
// Get the logs of the block
|
logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
|
||||||
logsList, err := f.backend.GetLogs(ctx, header.Hash())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var unfiltered []*types.Log
|
|
||||||
for _, logs := range logsList {
|
unfiltered := flatten(logsList)
|
||||||
unfiltered = append(unfiltered, logs...)
|
logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
|
||||||
}
|
|
||||||
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
|
|
||||||
if len(logs) > 0 {
|
if len(logs) > 0 {
|
||||||
// We have matching logs, check if we need to resolve full logs via the light client
|
// We have matching logs, check if we need to resolve full logs via the light client
|
||||||
if logs[0].TxHash == (common.Hash{}) {
|
if logs[0].TxHash == (common.Hash{}) {
|
||||||
receipts, err := f.backend.GetReceipts(ctx, header.Hash())
|
receipts, err := f.sys.backend.GetReceipts(ctx, header.Hash())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -291,7 +268,7 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [
|
|||||||
|
|
||||||
// pendingLogs returns the logs matching the filter criteria within the pending block.
|
// pendingLogs returns the logs matching the filter criteria within the pending block.
|
||||||
func (f *Filter) pendingLogs() ([]*types.Log, error) {
|
func (f *Filter) pendingLogs() ([]*types.Log, error) {
|
||||||
block, receipts := f.backend.PendingBlockAndReceipts()
|
block, receipts := f.sys.backend.PendingBlockAndReceipts()
|
||||||
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
|
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
|
||||||
var unfiltered []*types.Log
|
var unfiltered []*types.Log
|
||||||
for _, r := range receipts {
|
for _, r := range receipts {
|
||||||
@ -376,3 +353,11 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
|
|||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func flatten(list [][]*types.Log) []*types.Log {
|
||||||
|
var flat []*types.Log
|
||||||
|
for _, logs := range list {
|
||||||
|
flat = append(flat, logs...)
|
||||||
|
}
|
||||||
|
return flat
|
||||||
|
}
|
||||||
|
@ -27,13 +27,90 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum"
|
"github.com/ethereum/go-ethereum"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
|
"github.com/ethereum/go-ethereum/core/bloombits"
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
|
lru "github.com/hashicorp/golang-lru"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Config represents the configuration of the filter system.
|
||||||
|
type Config struct {
|
||||||
|
LogCacheSize int // maximum number of cached blocks (default: 32)
|
||||||
|
Timeout time.Duration // how long filters stay active (default: 5min)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfg Config) withDefaults() Config {
|
||||||
|
if cfg.Timeout == 0 {
|
||||||
|
cfg.Timeout = 5 * time.Minute
|
||||||
|
}
|
||||||
|
if cfg.LogCacheSize == 0 {
|
||||||
|
cfg.LogCacheSize = 32
|
||||||
|
}
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
|
|
||||||
|
type Backend interface {
|
||||||
|
ChainDb() ethdb.Database
|
||||||
|
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
|
||||||
|
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
|
||||||
|
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
|
||||||
|
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
|
||||||
|
PendingBlockAndReceipts() (*types.Block, types.Receipts)
|
||||||
|
|
||||||
|
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
|
||||||
|
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
||||||
|
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
||||||
|
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||||
|
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||||
|
|
||||||
|
BloomStatus() (uint64, uint64)
|
||||||
|
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FilterSystem holds resources shared by all filters.
|
||||||
|
type FilterSystem struct {
|
||||||
|
backend Backend
|
||||||
|
logsCache *lru.Cache
|
||||||
|
cfg *Config
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFilterSystem creates a filter system.
|
||||||
|
func NewFilterSystem(backend Backend, config Config) *FilterSystem {
|
||||||
|
config = config.withDefaults()
|
||||||
|
|
||||||
|
cache, err := lru.New(config.LogCacheSize)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return &FilterSystem{
|
||||||
|
backend: backend,
|
||||||
|
logsCache: cache,
|
||||||
|
cfg: &config,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cachedGetLogs loads block logs from the backend and caches the result.
|
||||||
|
func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) {
|
||||||
|
cached, ok := sys.logsCache.Get(blockHash)
|
||||||
|
if ok {
|
||||||
|
return cached.([][]*types.Log), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logs, err := sys.backend.GetLogs(ctx, blockHash, number)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if logs == nil {
|
||||||
|
return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString())
|
||||||
|
}
|
||||||
|
sys.logsCache.Add(blockHash, logs)
|
||||||
|
return logs, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Type determines the kind of filter and is used to put the filter in to
|
// Type determines the kind of filter and is used to put the filter in to
|
||||||
// the correct bucket when added.
|
// the correct bucket when added.
|
||||||
type Type byte
|
type Type byte
|
||||||
@ -84,6 +161,7 @@ type subscription struct {
|
|||||||
// subscription which match the subscription criteria.
|
// subscription which match the subscription criteria.
|
||||||
type EventSystem struct {
|
type EventSystem struct {
|
||||||
backend Backend
|
backend Backend
|
||||||
|
sys *FilterSystem
|
||||||
lightMode bool
|
lightMode bool
|
||||||
lastHead *types.Header
|
lastHead *types.Header
|
||||||
|
|
||||||
@ -110,9 +188,10 @@ type EventSystem struct {
|
|||||||
//
|
//
|
||||||
// The returned manager has a loop that needs to be stopped with the Stop function
|
// The returned manager has a loop that needs to be stopped with the Stop function
|
||||||
// or by stopping the given mux.
|
// or by stopping the given mux.
|
||||||
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
|
func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
|
||||||
m := &EventSystem{
|
m := &EventSystem{
|
||||||
backend: backend,
|
sys: sys,
|
||||||
|
backend: sys.backend,
|
||||||
lightMode: lightMode,
|
lightMode: lightMode,
|
||||||
install: make(chan *subscription),
|
install: make(chan *subscription),
|
||||||
uninstall: make(chan *subscription),
|
uninstall: make(chan *subscription),
|
||||||
@ -405,7 +484,7 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
|
|||||||
// Get the logs of the block
|
// Get the logs of the block
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
logsList, err := es.backend.GetLogs(ctx, header.Hash())
|
logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -39,10 +39,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
deadline = 5 * time.Minute
|
|
||||||
)
|
|
||||||
|
|
||||||
type testBackend struct {
|
type testBackend struct {
|
||||||
db ethdb.Database
|
db ethdb.Database
|
||||||
sections uint64
|
sections uint64
|
||||||
@ -91,17 +87,8 @@ func (b *testBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
|
func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
|
||||||
number := rawdb.ReadHeaderNumber(b.db, hash)
|
logs := rawdb.ReadLogs(b.db, hash, number, params.TestChainConfig)
|
||||||
if number == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
receipts := rawdb.ReadReceipts(b.db, hash, *number, params.TestChainConfig)
|
|
||||||
|
|
||||||
logs := make([][]*types.Log, len(receipts))
|
|
||||||
for i, receipt := range receipts {
|
|
||||||
logs[i] = receipt.Logs
|
|
||||||
}
|
|
||||||
return logs, nil
|
return logs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,6 +147,12 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) {
|
||||||
|
backend := &testBackend{db: db}
|
||||||
|
sys := NewFilterSystem(backend, cfg)
|
||||||
|
return backend, sys
|
||||||
|
}
|
||||||
|
|
||||||
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
|
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
|
||||||
// It creates multiple subscriptions:
|
// It creates multiple subscriptions:
|
||||||
// - one at the start and should receive all posted chain events and a second (blockHashes)
|
// - one at the start and should receive all posted chain events and a second (blockHashes)
|
||||||
@ -169,12 +162,12 @@ func TestBlockSubscription(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
db = rawdb.NewMemoryDatabase()
|
db = rawdb.NewMemoryDatabase()
|
||||||
backend = &testBackend{db: db}
|
backend, sys = newTestFilterSystem(t, db, Config{})
|
||||||
api = NewFilterAPI(backend, false, deadline)
|
api = NewFilterAPI(sys, false)
|
||||||
genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
|
genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
|
||||||
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
|
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
|
||||||
chainEvents = []core.ChainEvent{}
|
chainEvents = []core.ChainEvent{}
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, blk := range chain {
|
for _, blk := range chain {
|
||||||
@ -221,9 +214,9 @@ func TestPendingTxFilter(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
db = rawdb.NewMemoryDatabase()
|
db = rawdb.NewMemoryDatabase()
|
||||||
backend = &testBackend{db: db}
|
backend, sys = newTestFilterSystem(t, db, Config{})
|
||||||
api = NewFilterAPI(backend, false, deadline)
|
api = NewFilterAPI(sys, false)
|
||||||
|
|
||||||
transactions = []*types.Transaction{
|
transactions = []*types.Transaction{
|
||||||
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
|
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
|
||||||
@ -276,9 +269,9 @@ func TestPendingTxFilter(t *testing.T) {
|
|||||||
// If not it must return an error.
|
// If not it must return an error.
|
||||||
func TestLogFilterCreation(t *testing.T) {
|
func TestLogFilterCreation(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
db = rawdb.NewMemoryDatabase()
|
db = rawdb.NewMemoryDatabase()
|
||||||
backend = &testBackend{db: db}
|
_, sys = newTestFilterSystem(t, db, Config{})
|
||||||
api = NewFilterAPI(backend, false, deadline)
|
api = NewFilterAPI(sys, false)
|
||||||
|
|
||||||
testCases = []struct {
|
testCases = []struct {
|
||||||
crit FilterCriteria
|
crit FilterCriteria
|
||||||
@ -323,9 +316,9 @@ func TestInvalidLogFilterCreation(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
db = rawdb.NewMemoryDatabase()
|
db = rawdb.NewMemoryDatabase()
|
||||||
backend = &testBackend{db: db}
|
_, sys = newTestFilterSystem(t, db, Config{})
|
||||||
api = NewFilterAPI(backend, false, deadline)
|
api = NewFilterAPI(sys, false)
|
||||||
)
|
)
|
||||||
|
|
||||||
// different situations where log filter creation should fail.
|
// different situations where log filter creation should fail.
|
||||||
@ -346,8 +339,8 @@ func TestInvalidLogFilterCreation(t *testing.T) {
|
|||||||
func TestInvalidGetLogsRequest(t *testing.T) {
|
func TestInvalidGetLogsRequest(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
db = rawdb.NewMemoryDatabase()
|
db = rawdb.NewMemoryDatabase()
|
||||||
backend = &testBackend{db: db}
|
_, sys = newTestFilterSystem(t, db, Config{})
|
||||||
api = NewFilterAPI(backend, false, deadline)
|
api = NewFilterAPI(sys, false)
|
||||||
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
|
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -370,9 +363,9 @@ func TestLogFilter(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
db = rawdb.NewMemoryDatabase()
|
db = rawdb.NewMemoryDatabase()
|
||||||
backend = &testBackend{db: db}
|
backend, sys = newTestFilterSystem(t, db, Config{})
|
||||||
api = NewFilterAPI(backend, false, deadline)
|
api = NewFilterAPI(sys, false)
|
||||||
|
|
||||||
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
||||||
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
|
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
|
||||||
@ -484,9 +477,9 @@ func TestPendingLogsSubscription(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
db = rawdb.NewMemoryDatabase()
|
db = rawdb.NewMemoryDatabase()
|
||||||
backend = &testBackend{db: db}
|
backend, sys = newTestFilterSystem(t, db, Config{})
|
||||||
api = NewFilterAPI(backend, false, deadline)
|
api = NewFilterAPI(sys, false)
|
||||||
|
|
||||||
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
||||||
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
|
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
|
||||||
@ -668,10 +661,10 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
|
|||||||
timeout := 100 * time.Millisecond
|
timeout := 100 * time.Millisecond
|
||||||
|
|
||||||
var (
|
var (
|
||||||
db = rawdb.NewMemoryDatabase()
|
db = rawdb.NewMemoryDatabase()
|
||||||
backend = &testBackend{db: db}
|
backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout})
|
||||||
api = NewFilterAPI(backend, false, timeout)
|
api = NewFilterAPI(sys, false)
|
||||||
done = make(chan struct{})
|
done = make(chan struct{})
|
||||||
)
|
)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -44,7 +44,7 @@ func BenchmarkFilters(b *testing.B) {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "", false)
|
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "", false)
|
||||||
backend = &testBackend{db: db}
|
_, sys = newTestFilterSystem(b, db, Config{})
|
||||||
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||||
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
|
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
|
||||||
addr2 = common.BytesToAddress([]byte("jeff"))
|
addr2 = common.BytesToAddress([]byte("jeff"))
|
||||||
@ -89,7 +89,7 @@ func BenchmarkFilters(b *testing.B) {
|
|||||||
}
|
}
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
|
filter := sys.NewRangeFilter(0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
logs, _ := filter.Logs(context.Background())
|
logs, _ := filter.Logs(context.Background())
|
||||||
@ -104,7 +104,7 @@ func TestFilters(t *testing.T) {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "", false)
|
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "", false)
|
||||||
backend = &testBackend{db: db}
|
_, sys = newTestFilterSystem(t, db, Config{})
|
||||||
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||||
addr = crypto.PubkeyToAddress(key1.PublicKey)
|
addr = crypto.PubkeyToAddress(key1.PublicKey)
|
||||||
|
|
||||||
@ -175,14 +175,14 @@ func TestFilters(t *testing.T) {
|
|||||||
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i])
|
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
|
filter := sys.NewRangeFilter(0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
|
||||||
|
|
||||||
logs, _ := filter.Logs(context.Background())
|
logs, _ := filter.Logs(context.Background())
|
||||||
if len(logs) != 4 {
|
if len(logs) != 4 {
|
||||||
t.Error("expected 4 log, got", len(logs))
|
t.Error("expected 4 log, got", len(logs))
|
||||||
}
|
}
|
||||||
|
|
||||||
filter = NewRangeFilter(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
|
filter = sys.NewRangeFilter(900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
|
||||||
logs, _ = filter.Logs(context.Background())
|
logs, _ = filter.Logs(context.Background())
|
||||||
if len(logs) != 1 {
|
if len(logs) != 1 {
|
||||||
t.Error("expected 1 log, got", len(logs))
|
t.Error("expected 1 log, got", len(logs))
|
||||||
@ -191,7 +191,7 @@ func TestFilters(t *testing.T) {
|
|||||||
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
|
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
filter = NewRangeFilter(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
|
filter = sys.NewRangeFilter(990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
|
||||||
logs, _ = filter.Logs(context.Background())
|
logs, _ = filter.Logs(context.Background())
|
||||||
if len(logs) != 1 {
|
if len(logs) != 1 {
|
||||||
t.Error("expected 1 log, got", len(logs))
|
t.Error("expected 1 log, got", len(logs))
|
||||||
@ -200,7 +200,7 @@ func TestFilters(t *testing.T) {
|
|||||||
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
|
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
filter = NewRangeFilter(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}})
|
filter = sys.NewRangeFilter(1, 10, nil, [][]common.Hash{{hash1, hash2}})
|
||||||
|
|
||||||
logs, _ = filter.Logs(context.Background())
|
logs, _ = filter.Logs(context.Background())
|
||||||
if len(logs) != 2 {
|
if len(logs) != 2 {
|
||||||
@ -208,7 +208,7 @@ func TestFilters(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
failHash := common.BytesToHash([]byte("fail"))
|
failHash := common.BytesToHash([]byte("fail"))
|
||||||
filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}})
|
filter = sys.NewRangeFilter(0, -1, nil, [][]common.Hash{{failHash}})
|
||||||
|
|
||||||
logs, _ = filter.Logs(context.Background())
|
logs, _ = filter.Logs(context.Background())
|
||||||
if len(logs) != 0 {
|
if len(logs) != 0 {
|
||||||
@ -216,14 +216,14 @@ func TestFilters(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
failAddr := common.BytesToAddress([]byte("failmenow"))
|
failAddr := common.BytesToAddress([]byte("failmenow"))
|
||||||
filter = NewRangeFilter(backend, 0, -1, []common.Address{failAddr}, nil)
|
filter = sys.NewRangeFilter(0, -1, []common.Address{failAddr}, nil)
|
||||||
|
|
||||||
logs, _ = filter.Logs(context.Background())
|
logs, _ = filter.Logs(context.Background())
|
||||||
if len(logs) != 0 {
|
if len(logs) != 0 {
|
||||||
t.Error("expected 0 log, got", len(logs))
|
t.Error("expected 0 log, got", len(logs))
|
||||||
}
|
}
|
||||||
|
|
||||||
filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
|
filter = sys.NewRangeFilter(0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
|
||||||
|
|
||||||
logs, _ = filter.Logs(context.Background())
|
logs, _ = filter.Logs(context.Background())
|
||||||
if len(logs) != 0 {
|
if len(logs) != 0 {
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/eth"
|
"github.com/ethereum/go-ethereum/eth"
|
||||||
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
||||||
|
"github.com/ethereum/go-ethereum/eth/filters"
|
||||||
"github.com/ethereum/go-ethereum/ethclient"
|
"github.com/ethereum/go-ethereum/ethclient"
|
||||||
"github.com/ethereum/go-ethereum/node"
|
"github.com/ethereum/go-ethereum/node"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
@ -60,6 +61,12 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create new ethereum service: %v", err)
|
t.Fatalf("can't create new ethereum service: %v", err)
|
||||||
}
|
}
|
||||||
|
filterSystem := filters.NewFilterSystem(ethservice.APIBackend, filters.Config{})
|
||||||
|
n.RegisterAPIs([]rpc.API{{
|
||||||
|
Namespace: "eth",
|
||||||
|
Service: filters.NewFilterAPI(filterSystem, false),
|
||||||
|
}})
|
||||||
|
|
||||||
// Import the test chain.
|
// Import the test chain.
|
||||||
if err := n.Start(); err != nil {
|
if err := n.Start(); err != nil {
|
||||||
t.Fatalf("can't start test node: %v", err)
|
t.Fatalf("can't start test node: %v", err)
|
||||||
|
@ -450,12 +450,36 @@ func (t *Transaction) CreatedContract(ctx context.Context, args BlockNumberArgs)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *Transaction) Logs(ctx context.Context) (*[]*Log, error) {
|
func (t *Transaction) Logs(ctx context.Context) (*[]*Log, error) {
|
||||||
receipt, err := t.getReceipt(ctx)
|
if _, err := t.resolve(ctx); err != nil {
|
||||||
if err != nil || receipt == nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ret := make([]*Log, 0, len(receipt.Logs))
|
if t.block == nil {
|
||||||
for _, log := range receipt.Logs {
|
return nil, nil
|
||||||
|
}
|
||||||
|
if _, ok := t.block.numberOrHash.Hash(); !ok {
|
||||||
|
header, err := t.r.backend.HeaderByNumberOrHash(ctx, *t.block.numberOrHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
hash := header.Hash()
|
||||||
|
t.block.numberOrHash.BlockHash = &hash
|
||||||
|
}
|
||||||
|
return t.getLogs(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getLogs returns log objects for the given tx.
|
||||||
|
// Assumes block hash is resolved.
|
||||||
|
func (t *Transaction) getLogs(ctx context.Context) (*[]*Log, error) {
|
||||||
|
var (
|
||||||
|
hash, _ = t.block.numberOrHash.Hash()
|
||||||
|
filter = t.r.filterSystem.NewBlockFilter(hash, nil, nil)
|
||||||
|
logs, err = filter.Logs(ctx)
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ret := make([]*Log, 0, len(logs))
|
||||||
|
for _, log := range logs {
|
||||||
ret = append(ret, &Log{
|
ret = append(ret, &Log{
|
||||||
r: t.r,
|
r: t.r,
|
||||||
transaction: t,
|
transaction: t,
|
||||||
@ -978,7 +1002,7 @@ func (b *Block) Logs(ctx context.Context, args struct{ Filter BlockFilterCriteri
|
|||||||
hash = header.Hash()
|
hash = header.Hash()
|
||||||
}
|
}
|
||||||
// Construct the range filter
|
// Construct the range filter
|
||||||
filter := filters.NewBlockFilter(b.r.backend, hash, addresses, topics)
|
filter := b.r.filterSystem.NewBlockFilter(hash, addresses, topics)
|
||||||
|
|
||||||
// Run the filter and return all the logs
|
// Run the filter and return all the logs
|
||||||
return runFilter(ctx, b.r, filter)
|
return runFilter(ctx, b.r, filter)
|
||||||
@ -1137,7 +1161,8 @@ func (p *Pending) EstimateGas(ctx context.Context, args struct {
|
|||||||
|
|
||||||
// Resolver is the top-level object in the GraphQL hierarchy.
|
// Resolver is the top-level object in the GraphQL hierarchy.
|
||||||
type Resolver struct {
|
type Resolver struct {
|
||||||
backend ethapi.Backend
|
backend ethapi.Backend
|
||||||
|
filterSystem *filters.FilterSystem
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Resolver) Block(ctx context.Context, args struct {
|
func (r *Resolver) Block(ctx context.Context, args struct {
|
||||||
@ -1284,7 +1309,7 @@ func (r *Resolver) Logs(ctx context.Context, args struct{ Filter FilterCriteria
|
|||||||
topics = *args.Filter.Topics
|
topics = *args.Filter.Topics
|
||||||
}
|
}
|
||||||
// Construct the range filter
|
// Construct the range filter
|
||||||
filter := filters.NewRangeFilter(r.backend, begin, end, addresses, topics)
|
filter := r.filterSystem.NewRangeFilter(begin, end, addresses, topics)
|
||||||
return runFilter(ctx, r, filter)
|
return runFilter(ctx, r, filter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/eth"
|
"github.com/ethereum/go-ethereum/eth"
|
||||||
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
||||||
|
"github.com/ethereum/go-ethereum/eth/filters"
|
||||||
"github.com/ethereum/go-ethereum/node"
|
"github.com/ethereum/go-ethereum/node"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
|
|
||||||
@ -50,7 +51,7 @@ func TestBuildSchema(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer stack.Close()
|
defer stack.Close()
|
||||||
// Make sure the schema can be parsed and matched up to the object model.
|
// Make sure the schema can be parsed and matched up to the object model.
|
||||||
if err := newHandler(stack, nil, []string{}, []string{}); err != nil {
|
if err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil {
|
||||||
t.Errorf("Could not construct GraphQL handler: %v", err)
|
t.Errorf("Could not construct GraphQL handler: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -263,7 +264,8 @@ func createGQLService(t *testing.T, stack *node.Node) {
|
|||||||
t.Fatalf("could not create import blocks: %v", err)
|
t.Fatalf("could not create import blocks: %v", err)
|
||||||
}
|
}
|
||||||
// create gql service
|
// create gql service
|
||||||
err = New(stack, ethBackend.APIBackend, []string{}, []string{})
|
filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{})
|
||||||
|
err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not create graphql service: %v", err)
|
t.Fatalf("could not create graphql service: %v", err)
|
||||||
}
|
}
|
||||||
@ -348,7 +350,8 @@ func createGQLServiceWithTransactions(t *testing.T, stack *node.Node) {
|
|||||||
t.Fatalf("could not create import blocks: %v", err)
|
t.Fatalf("could not create import blocks: %v", err)
|
||||||
}
|
}
|
||||||
// create gql service
|
// create gql service
|
||||||
err = New(stack, ethBackend.APIBackend, []string{}, []string{})
|
filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{})
|
||||||
|
err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not create graphql service: %v", err)
|
t.Fatalf("could not create graphql service: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/eth/filters"
|
||||||
"github.com/ethereum/go-ethereum/internal/ethapi"
|
"github.com/ethereum/go-ethereum/internal/ethapi"
|
||||||
"github.com/ethereum/go-ethereum/node"
|
"github.com/ethereum/go-ethereum/node"
|
||||||
"github.com/graph-gophers/graphql-go"
|
"github.com/graph-gophers/graphql-go"
|
||||||
@ -55,14 +56,14 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// New constructs a new GraphQL service instance.
|
// New constructs a new GraphQL service instance.
|
||||||
func New(stack *node.Node, backend ethapi.Backend, cors, vhosts []string) error {
|
func New(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error {
|
||||||
return newHandler(stack, backend, cors, vhosts)
|
return newHandler(stack, backend, filterSystem, cors, vhosts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newHandler returns a new `http.Handler` that will answer GraphQL queries.
|
// newHandler returns a new `http.Handler` that will answer GraphQL queries.
|
||||||
// It additionally exports an interactive query browser on the / endpoint.
|
// It additionally exports an interactive query browser on the / endpoint.
|
||||||
func newHandler(stack *node.Node, backend ethapi.Backend, cors, vhosts []string) error {
|
func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error {
|
||||||
q := Resolver{backend}
|
q := Resolver{backend, filterSystem}
|
||||||
|
|
||||||
s, err := graphql.ParseSchema(schema, &q)
|
s, err := graphql.ParseSchema(schema, &q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -27,10 +27,10 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/consensus"
|
"github.com/ethereum/go-ethereum/consensus"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/bloombits"
|
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/core/vm"
|
"github.com/ethereum/go-ethereum/core/vm"
|
||||||
|
"github.com/ethereum/go-ethereum/eth/filters"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
@ -84,16 +84,12 @@ type Backend interface {
|
|||||||
TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions)
|
TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions)
|
||||||
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
|
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
|
||||||
|
|
||||||
// Filter API
|
|
||||||
BloomStatus() (uint64, uint64)
|
|
||||||
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
|
|
||||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
|
||||||
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
|
||||||
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
|
|
||||||
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
|
||||||
|
|
||||||
ChainConfig() *params.ChainConfig
|
ChainConfig() *params.ChainConfig
|
||||||
Engine() consensus.Engine
|
Engine() consensus.Engine
|
||||||
|
|
||||||
|
// eth/filters needs to be initialized from this backend type, so methods needed by
|
||||||
|
// it must also be included here.
|
||||||
|
filters.Backend
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetAPIs(apiBackend Backend) []rpc.API {
|
func GetAPIs(apiBackend Backend) []rpc.API {
|
||||||
|
@ -168,11 +168,8 @@ func (b *LesApiBackend) GetReceipts(ctx context.Context, hash common.Hash) (type
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
|
func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
|
||||||
if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil {
|
return light.GetBlockLogs(ctx, b.eth.odr, hash, number)
|
||||||
return light.GetBlockLogs(ctx, b.eth.odr, hash, *number)
|
|
||||||
}
|
|
||||||
return nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *LesApiBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {
|
func (b *LesApiBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {
|
||||||
|
@ -32,7 +32,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
||||||
"github.com/ethereum/go-ethereum/eth/filters"
|
|
||||||
"github.com/ethereum/go-ethereum/eth/gasprice"
|
"github.com/ethereum/go-ethereum/eth/gasprice"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/internal/ethapi"
|
"github.com/ethereum/go-ethereum/internal/ethapi"
|
||||||
@ -298,9 +297,6 @@ func (s *LightEthereum) APIs() []rpc.API {
|
|||||||
}, {
|
}, {
|
||||||
Namespace: "eth",
|
Namespace: "eth",
|
||||||
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux),
|
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux),
|
||||||
}, {
|
|
||||||
Namespace: "eth",
|
|
||||||
Service: filters.NewFilterAPI(s.ApiBackend, true, 5*time.Minute),
|
|
||||||
}, {
|
}, {
|
||||||
Namespace: "net",
|
Namespace: "net",
|
||||||
Service: s.netRPCService,
|
Service: s.netRPCService,
|
||||||
|
Loading…
Reference in New Issue
Block a user