From 1d519854e2bfe8d5f2e8674f4f04ccf9aeaabe84 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 4 Feb 2015 17:28:54 -0800 Subject: [PATCH] Propagate known transactions to new peers on connect --- cmd/mist/assets/examples/coin.html | 8 ++++---- core/filter.go | 23 +++++++++-------------- eth/protocol.go | 12 ++++++++++++ rpc/args.go | 6 +++--- rpc/message.go | 2 +- rpc/packages.go | 1 + 6 files changed, 30 insertions(+), 22 deletions(-) diff --git a/cmd/mist/assets/examples/coin.html b/cmd/mist/assets/examples/coin.html index 070ac94a6..a84a828af 100644 --- a/cmd/mist/assets/examples/coin.html +++ b/cmd/mist/assets/examples/coin.html @@ -57,8 +57,8 @@ "name":"Changed", "type":"event", "inputs": [ - {"name":"to","type":"address","indexed":false}, - {"name":"amount","type":"uint256","indexed":true}, + {"name":"to","type":"address","indexed":true}, + {"name":"amount","type":"uint256","indexed":false}, ], }]; @@ -74,12 +74,12 @@ } var contract = web3.eth.contract(address, desc); - contract.Changed({to: "0xaabb"}).changed(function(e) { + contract.Changed({to: "0xaa"}).changed(function(e) { console.log("e: " + JSON.stringify(e)); }); contract.transact({gas: "10000", gasprice: eth.gasPrice}).send( "0xaa", 10000 ); function reflesh() { - document.querySelector("#balance").innerHTML = contract.call().balance(eth.coinbase); + document.querySelector("#balance").innerHTML = contract.balance(eth.coinbase); var table = document.querySelector("#table"); table.innerHTML = ""; // clear diff --git a/core/filter.go b/core/filter.go index a458165f5..c22996d7e 100644 --- a/core/filter.go +++ b/core/filter.go @@ -2,7 +2,6 @@ package core import ( "bytes" - "fmt" "math" "github.com/ethereum/go-ethereum/core/types" @@ -129,37 +128,33 @@ func (self *Filter) Find() state.Logs { return logs[skip:] } -func includes(addresses [][]byte, a []byte) (found bool) { +func includes(addresses [][]byte, a []byte) bool { for _, addr := range addresses { - fmt.Println("INCLUDES", addr, a) - if bytes.Compare(addr, a) == 0 { - return true + if !bytes.Equal(addr, a) { + return false } } - return + return true } func (self *Filter) FilterLogs(logs state.Logs) state.Logs { - fmt.Println("FILTER LOGS", self.topics) var ret state.Logs // Filter the logs for interesting stuff +Logs: for _, log := range logs { - fmt.Println(log) - if len(self.address) > 0 && !bytes.Equal(self.address, log.Address()) { continue } - for _, topic := range self.topics { - fmt.Println("TOPIC:", topic) - if !includes(log.Topics(), topic) { - continue + max := int(math.Min(float64(len(self.topics)), float64(len(log.Topics())))) + for i := 0; i < max; i++ { + if !bytes.Equal(log.Topics()[i], self.topics[i]) { + continue Logs } } - fmt.Println("APPENDED") ret = append(ret, log) } diff --git a/eth/protocol.go b/eth/protocol.go index 68c52b7ce..d7a7fa910 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -46,6 +46,7 @@ type ethProtocol struct { // used as an argument to EthProtocol type txPool interface { AddTransactions([]*types.Transaction) + GetTransactions() types.Transactions } type chainManager interface { @@ -101,6 +102,7 @@ func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPoo } err = self.handleStatus() if err == nil { + self.propagateTxs() for { err = self.handle() if err != nil { @@ -324,3 +326,13 @@ func (self *ethProtocol) protoErrorDisconnect(code int, format string, params .. } } + +func (self *ethProtocol) propagateTxs() { + transactions := self.txPool.GetTransactions() + iface := make([]interface{}, len(transactions)) + for i, transaction := range transactions { + iface[i] = transaction + } + + self.rw.WriteMsg(p2p.NewMsg(TxMsg, iface...)) +} diff --git a/rpc/args.go b/rpc/args.go index 75eef873d..34e706b98 100644 --- a/rpc/args.go +++ b/rpc/args.go @@ -214,7 +214,7 @@ type FilterOptions struct { Earliest int64 Latest int64 Address string - Topics []string + Topic []string Skip int Max int } @@ -224,8 +224,8 @@ func toFilterOptions(options *FilterOptions) core.FilterOptions { opts.Earliest = options.Earliest opts.Latest = options.Latest opts.Address = fromHex(options.Address) - opts.Topics = make([][]byte, len(options.Topics)) - for i, topic := range options.Topics { + opts.Topics = make([][]byte, len(options.Topic)) + for i, topic := range options.Topic { opts.Topics[i] = fromHex(topic) } diff --git a/rpc/message.go b/rpc/message.go index f1e982397..0845a2239 100644 --- a/rpc/message.go +++ b/rpc/message.go @@ -206,7 +206,7 @@ func (req *RpcRequest) ToFilterArgs() (*FilterOptions, error) { if len(req.Params) < 1 { return nil, NewErrorResponse(ErrorArguments) } - fmt.Println("filter params", req.Params) + fmt.Println("FILTER PARAMS", string(req.Params[0])) args := new(FilterOptions) r := bytes.NewReader(req.Params[0]) diff --git a/rpc/packages.go b/rpc/packages.go index 06de5ca38..a98d99d6c 100644 --- a/rpc/packages.go +++ b/rpc/packages.go @@ -70,6 +70,7 @@ func NewEthereumApi(eth *xeth.XEth) *EthereumApi { func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) error { var id int filter := core.NewFilter(self.xeth.Backend()) + filter.SetOptions(toFilterOptions(args)) filter.LogsCallback = func(logs state.Logs) { self.logMut.Lock() defer self.logMut.Unlock()