Reimplemented message filters for rpc calls

This commit is contained in:
obscuren 2015-01-29 16:52:00 +01:00
parent ddf17d93ac
commit 6488a392a3
6 changed files with 207 additions and 16 deletions

@ -17,7 +17,7 @@
var originalBalance = web3.toDecimal(balance); var originalBalance = web3.toDecimal(balance);
document.getElementById('original').innerText = 'original balance: ' + originalBalance + ' watching...'; document.getElementById('original').innerText = 'original balance: ' + originalBalance + ' watching...';
web3.eth.watch({altered: coinbase}).changed(function() { var filter = web3.eth.watch({address: coinbase}).changed(function() {
balance = web3.eth.balanceAt(coinbase) balance = web3.eth.balanceAt(coinbase)
var currentBalance = web3.toDecimal(balance); var currentBalance = web3.toDecimal(balance);
document.getElementById("current").innerText = 'current: ' + currentBalance; document.getElementById("current").innerText = 'current: ' + currentBalance;

@ -12,6 +12,17 @@ type AccountChange struct {
Address, StateAddress []byte Address, StateAddress []byte
} }
type FilterOptions struct {
Earliest int64
Latest int64
Address []byte
Topics [][]byte
Skip int
Max int
}
// Filtering interface // Filtering interface
type Filter struct { type Filter struct {
eth EthManager eth EthManager
@ -32,6 +43,16 @@ func NewFilter(eth EthManager) *Filter {
return &Filter{eth: eth} return &Filter{eth: eth}
} }
func (self *Filter) SetOptions(options FilterOptions) {
self.earliest = options.Earliest
self.latest = options.Latest
self.skip = options.Skip
self.max = options.Max
self.address = options.Address
self.topics = options.Topics
}
// Set the earliest and latest block for filtering. // Set the earliest and latest block for filtering.
// -1 = latest block (i.e., the current block) // -1 = latest block (i.e., the current block)
// hash = particular hash from-to // hash = particular hash from-to

@ -1,6 +1,7 @@
package rpc package rpc
import "encoding/json" import "encoding/json"
import "github.com/ethereum/go-ethereum/core"
type GetBlockArgs struct { type GetBlockArgs struct {
BlockNumber int32 BlockNumber int32
@ -36,10 +37,6 @@ type NewTxArgs struct {
Data string `json:"data"` Data string `json:"data"`
} }
// type TxResponse struct {
// Hash string
// }
func (a *NewTxArgs) requirements() error { func (a *NewTxArgs) requirements() error {
if a.Gas == "" { if a.Gas == "" {
return NewErrorResponse("Transact requires a 'gas' value as argument") return NewErrorResponse("Transact requires a 'gas' value as argument")
@ -195,3 +192,29 @@ func (obj *Sha3Args) UnmarshalJSON(b []byte) (err error) {
} }
return return
} }
type FilterOptions struct {
Earliest int64
Latest int64
Address string
Topics []string
Skip int
Max int
}
func toFilterOptions(options *FilterOptions) core.FilterOptions {
var opts core.FilterOptions
opts.Earliest = options.Earliest
opts.Latest = options.Latest
opts.Address = fromHex(options.Address)
opts.Topics = make([][]byte, len(options.Topics))
for i, topic := range options.Topics {
opts.Topics[i] = fromHex(topic)
}
return opts
}
type FilterChangedArgs struct {
n int
}

@ -21,6 +21,8 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/state"
) )
const ( const (
@ -184,3 +186,56 @@ func (req *RpcRequest) ToGetCodeAtArgs() (*GetCodeAtArgs, error) {
rpclogger.DebugDetailf("%T %v", args, args) rpclogger.DebugDetailf("%T %v", args, args)
return args, nil return args, nil
} }
func (req *RpcRequest) ToFilterArgs() (*FilterOptions, error) {
if len(req.Params) < 1 {
return nil, NewErrorResponse(ErrorArguments)
}
args := new(FilterOptions)
r := bytes.NewReader(req.Params[0])
err := json.NewDecoder(r).Decode(args)
if err != nil {
return nil, NewErrorResponse(ErrorDecodeArgs)
}
rpclogger.DebugDetailf("%T %v", args, args)
return args, nil
}
func (req *RpcRequest) ToFilterChangedArgs() (int, error) {
if len(req.Params) < 1 {
return 0, NewErrorResponse(ErrorArguments)
}
var id int
r := bytes.NewReader(req.Params[0])
err := json.NewDecoder(r).Decode(&id)
if err != nil {
return 0, NewErrorResponse(ErrorDecodeArgs)
}
rpclogger.DebugDetailf("%T %v", id, id)
return id, nil
}
type Log struct {
Address string `json:"address"`
Topics []string `json:"topics"`
Data string `json:"data"`
}
func toLogs(logs state.Logs) (ls []Log) {
ls = make([]Log, len(logs))
for i, log := range logs {
var l Log
l.Topics = make([]string, len(log.Topics()))
l.Address = toHex(log.Address())
l.Data = toHex(log.Data())
for j, topic := range log.Topics() {
l.Topics[j] = toHex(topic)
}
ls[i] = l
}
return
}

@ -29,9 +29,13 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"strings" "strings"
"sync"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/event/filter"
"github.com/ethereum/go-ethereum/state"
"github.com/ethereum/go-ethereum/xeth" "github.com/ethereum/go-ethereum/xeth"
) )
@ -53,12 +57,79 @@ type RpcServer interface {
Stop() Stop()
} }
func NewEthereumApi(xeth *xeth.XEth) *EthereumApi {
return &EthereumApi{xeth: xeth}
}
type EthereumApi struct { type EthereumApi struct {
xeth *xeth.XEth xeth *xeth.XEth
filterManager *filter.FilterManager
mut sync.RWMutex
logs map[int]state.Logs
}
func NewEthereumApi(xeth *xeth.XEth) *EthereumApi {
api := &EthereumApi{
xeth: xeth,
filterManager: filter.NewFilterManager(xeth.Backend().EventMux()),
logs: make(map[int]state.Logs),
}
go api.filterManager.Start()
return api
}
func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) error {
var id int
filter := core.NewFilter(self.xeth.Backend())
filter.LogsCallback = func(logs state.Logs) {
self.mut.Lock()
defer self.mut.Unlock()
self.logs[id] = append(self.logs[id], logs...)
}
id = self.filterManager.InstallFilter(filter)
*reply = id
return nil
}
type Log struct {
Address string `json:"address"`
Topics []string `json:"topics"`
Data string `json:"data"`
}
func toLogs(logs state.Logs) (ls []Log) {
ls = make([]Log, len(logs))
for i, log := range logs {
var l Log
l.Topics = make([]string, len(log.Topics()))
l.Address = toHex(log.Address())
l.Data = toHex(log.Data())
for j, topic := range log.Topics() {
l.Topics[j] = toHex(topic)
}
ls[i] = l
}
return
}
func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error {
self.mut.RLock()
defer self.mut.RUnlock()
*reply = toLogs(self.logs[id])
self.logs[id] = nil // empty the logs
return nil
}
func (self *EthereumApi) Logs(id int, reply *interface{}) error {
filter := self.filterManager.GetFilter(id)
*reply = toLogs(filter.Find())
return nil
} }
func (p *EthereumApi) GetBlock(args *GetBlockArgs, reply *interface{}) error { func (p *EthereumApi) GetBlock(args *GetBlockArgs, reply *interface{}) error {
@ -162,7 +233,7 @@ func (p *EthereumApi) GetBalanceAt(args *GetBalanceArgs, reply *interface{}) err
return err return err
} }
state := p.xeth.State().SafeGet(args.Address) state := p.xeth.State().SafeGet(args.Address)
*reply = BalanceRes{Balance: state.Balance().String(), Address: args.Address} *reply = toHex(state.Balance().Bytes())
return nil return nil
} }
@ -234,6 +305,18 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
return err return err
} }
return p.Call(args, reply) return p.Call(args, reply)
case "eth_newFilter":
args, err := req.ToFilterArgs()
if err != nil {
return err
}
return p.NewFilter(args, reply)
case "eth_changed":
args, err := req.ToFilterChangedArgs()
if err != nil {
return err
}
return p.FilterChanged(args, reply)
case "web3_sha3": case "web3_sha3":
args, err := req.ToSha3Args() args, err := req.ToSha3Args()
if err != nil { if err != nil {

@ -12,7 +12,9 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/state"
) )
@ -22,19 +24,22 @@ var pipelogger = logger.NewLogger("XETH")
type Backend interface { type Backend interface {
BlockProcessor() *core.BlockProcessor BlockProcessor() *core.BlockProcessor
ChainManager() *core.ChainManager ChainManager() *core.ChainManager
KeyManager() *crypto.KeyManager TxPool() *core.TxPool
PeerCount() int
IsMining() bool IsMining() bool
IsListening() bool IsListening() bool
PeerCount() int Peers() []*p2p.Peer
KeyManager() *crypto.KeyManager
ClientIdentity() p2p.ClientIdentity
Db() ethutil.Database Db() ethutil.Database
TxPool() *core.TxPool EventMux() *event.TypeMux
} }
type XEth struct { type XEth struct {
eth Backend eth Backend
blockProcessor *core.BlockProcessor blockProcessor *core.BlockProcessor
chainManager *core.ChainManager chainManager *core.ChainManager
world *State state *State
} }
func New(eth Backend) *XEth { func New(eth Backend) *XEth {
@ -43,12 +48,16 @@ func New(eth Backend) *XEth {
blockProcessor: eth.BlockProcessor(), blockProcessor: eth.BlockProcessor(),
chainManager: eth.ChainManager(), chainManager: eth.ChainManager(),
} }
xeth.world = NewState(xeth) xeth.state = NewState(xeth)
return xeth return xeth
} }
func (self *XEth) State() *State { return self.world } func (self *XEth) Backend() Backend {
return self.eth
}
func (self *XEth) State() *State { return self.state }
func (self *XEth) BlockByHash(strHash string) *Block { func (self *XEth) BlockByHash(strHash string) *Block {
hash := fromHex(strHash) hash := fromHex(strHash)