rpc, xeth: finish cleaning up xeth

This commit is contained in:
Péter Szilágyi 2015-04-22 18:35:50 +03:00
parent 2b9fd6b40a
commit 978ffd3097
2 changed files with 36 additions and 17 deletions

@ -406,10 +406,13 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
res, _ := api.xeth().DbGet([]byte(args.Database + args.Key)) res, _ := api.xeth().DbGet([]byte(args.Database + args.Key))
*reply = newHexData(res) *reply = newHexData(res)
case "shh_version": case "shh_version":
// Retrieves the currently running whisper protocol version
*reply = api.xeth().WhisperVersion() *reply = api.xeth().WhisperVersion()
case "shh_post": case "shh_post":
// Injects a new message into the whisper network
args := new(WhisperMessageArgs) args := new(WhisperMessageArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
return err return err
@ -421,18 +424,17 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = true *reply = true
case "shh_newIdentity": case "shh_newIdentity":
// Creates a new whisper identity to use for sending/receiving messages
*reply = api.xeth().Whisper().NewIdentity() *reply = api.xeth().Whisper().NewIdentity()
case "shh_hasIdentity": case "shh_hasIdentity":
// Checks if an identity if owned or not
args := new(WhisperIdentityArgs) args := new(WhisperIdentityArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
return err return err
} }
*reply = api.xeth().Whisper().HasIdentity(args.Identity) *reply = api.xeth().Whisper().HasIdentity(args.Identity)
case "shh_newGroup", "shh_addToGroup":
return NewNotImplementedError(req.Method)
case "shh_newFilter": case "shh_newFilter":
// Create a new filter to watch and match messages with // Create a new filter to watch and match messages with
args := new(WhisperFilterArgs) args := new(WhisperFilterArgs)
@ -443,6 +445,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = newHexNum(big.NewInt(int64(id)).Bytes()) *reply = newHexNum(big.NewInt(int64(id)).Bytes())
case "shh_uninstallFilter": case "shh_uninstallFilter":
// Remove an existing filter watching messages
args := new(FilterIdArgs) args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
return err return err
@ -455,7 +458,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
return err return err
} }
*reply = api.xeth().MessagesChanged(args.Id) *reply = api.xeth().WhisperMessagesChanged(args.Id)
case "shh_getMessages": case "shh_getMessages":
// Retrieve all the cached messages matching a specific, existing filter // Retrieve all the cached messages matching a specific, existing filter
@ -463,7 +466,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
return err return err
} }
*reply = api.xeth().Messages(args.Id) *reply = api.xeth().WhisperMessages(args.Id)
// case "eth_register": // case "eth_register":
// // Placeholder for actual type // // Placeholder for actual type

@ -452,44 +452,60 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin
return filter.Find() return filter.Find()
} }
// NewWhisperFilter creates and registers a new message filter to watch for
// inbound whisper messages. All parameters at this point are assumed to be
// HEX encoded.
func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int {
// Pre-define the id to be filled later
var id int var id int
callback := func(msg WhisperMessage) {
p.messagesMut.Lock()
defer p.messagesMut.Unlock()
// Callback to delegate core whisper messages to this xeth filter
callback := func(msg WhisperMessage) {
p.messagesMut.RLock() // Only read lock to the filter pool
defer p.messagesMut.RUnlock()
p.messages[id].insert(msg) p.messages[id].insert(msg)
} }
// Initialize the core whisper filter and wrap into xeth
id = p.Whisper().Watch(to, from, topics, callback) id = p.Whisper().Watch(to, from, topics, callback)
p.messagesMut.Lock()
p.messages[id] = newWhisperFilter(id, p.Whisper()) p.messages[id] = newWhisperFilter(id, p.Whisper())
p.messagesMut.Unlock()
return id return id
} }
// UninstallWhisperFilter disables and removes an existing filter.
func (p *XEth) UninstallWhisperFilter(id int) bool { func (p *XEth) UninstallWhisperFilter(id int) bool {
p.messagesMut.Lock()
defer p.messagesMut.Unlock()
if _, ok := p.messages[id]; ok { if _, ok := p.messages[id]; ok {
delete(p.messages, id) delete(p.messages, id)
return true return true
} }
return false return false
} }
func (self *XEth) MessagesChanged(id int) []WhisperMessage { // WhisperMessages retrieves all the known messages that match a specific filter.
self.messagesMut.Lock() func (self *XEth) WhisperMessages(id int) []WhisperMessage {
defer self.messagesMut.Unlock() self.messagesMut.RLock()
defer self.messagesMut.RUnlock()
if self.messages[id] != nil { if self.messages[id] != nil {
return self.messages[id].retrieve() return self.messages[id].messages()
} }
return nil return nil
} }
func (self *XEth) Messages(id int) []WhisperMessage { // WhisperMessagesChanged retrieves all the new messages matched by a filter
self.messagesMut.Lock() // since the last retrieval
defer self.messagesMut.Unlock() func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage {
self.messagesMut.RLock()
defer self.messagesMut.RUnlock()
if self.messages[id] != nil { if self.messages[id] != nil {
return self.messages[id].messages() return self.messages[id].retrieve()
} }
return nil return nil
} }