Added JSFilter type

This commit is contained in:
obscuren 2014-08-20 09:59:09 +02:00
parent b0ae61c652
commit b97ea0e447

@ -2,10 +2,13 @@ package ethpipe
import ( import (
"encoding/json" "encoding/json"
"fmt"
"sync/atomic" "sync/atomic"
"github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethutil"
) )
@ -74,7 +77,8 @@ func (self *JSPipe) NumberToHuman(balance string) string {
} }
func (self *JSPipe) StorageAt(addr, storageAddr string) string { func (self *JSPipe) StorageAt(addr, storageAddr string) string {
return self.World().SafeGet(ethutil.Hex2Bytes(addr)).Storage(ethutil.Hex2Bytes(storageAddr)).Str() storage := self.World().SafeGet(ethutil.Hex2Bytes(addr)).Storage(ethutil.Hex2Bytes(storageAddr))
return storage.BigInt().String()
} }
func (self *JSPipe) TxCountAt(address string) int { func (self *JSPipe) TxCountAt(address string) int {
@ -186,10 +190,45 @@ func (self *JSPipe) CompileMutan(code string) string {
return ethutil.Bytes2Hex(data) return ethutil.Bytes2Hex(data)
} }
func (self *JSPipe) Messages(object map[string]interface{}) string { func (self *JSPipe) Watch(object map[string]interface{}) *JSFilter {
filter := ethchain.NewFilterFromMap(object, self.obj) return NewJSFilterFromMap(object, self.Pipe.obj)
/*} else if str, ok := object.(string); ok {
println("str")
return NewJSFilterFromString(str, self.Pipe.obj)
*/
}
messages := filter.Find() func (self *JSPipe) Messages(object map[string]interface{}) string {
filter := self.Watch(object)
defer filter.Uninstall()
return filter.Messages()
}
type JSFilter struct {
eth ethchain.EthManager
*ethchain.Filter
quit chan bool
BlockCallback func(*ethchain.Block)
MessageCallback func(ethstate.Messages)
}
func NewJSFilterFromMap(object map[string]interface{}, eth ethchain.EthManager) *JSFilter {
filter := &JSFilter{eth, ethchain.NewFilterFromMap(object, eth), make(chan bool), nil, nil}
go filter.mainLoop()
return filter
}
func NewJSFilterFromString(str string, eth ethchain.EthManager) *JSFilter {
return nil
}
func (self *JSFilter) MessagesToJson(messages ethstate.Messages) string {
var msgs []JSMessage var msgs []JSMessage
for _, m := range messages { for _, m := range messages {
msgs = append(msgs, NewJSMessage(m)) msgs = append(msgs, NewJSMessage(m))
@ -202,3 +241,44 @@ func (self *JSPipe) Messages(object map[string]interface{}) string {
return string(b) return string(b)
} }
func (self *JSFilter) Messages() string {
return self.MessagesToJson(self.Find())
}
func (self *JSFilter) mainLoop() {
blockChan := make(chan ethreact.Event, 1)
messageChan := make(chan ethreact.Event, 1)
// Subscribe to events
reactor := self.eth.Reactor()
reactor.Subscribe("newBlock", blockChan)
reactor.Subscribe("messages", messageChan)
out:
for {
select {
case <-self.quit:
break out
case block := <-blockChan:
if block, ok := block.Resource.(*ethchain.Block); ok {
if self.BlockCallback != nil {
self.BlockCallback(block)
}
}
case msg := <-messageChan:
if messages, ok := msg.Resource.(ethstate.Messages); ok {
if self.MessageCallback != nil {
msgs := self.FilterMessages(messages)
self.MessageCallback(msgs)
}
}
}
}
}
func (self *JSFilter) Changed(object interface{}) {
fmt.Printf("%T\n", object)
}
func (self *JSFilter) Uninstall() {
self.quit <- true
}