Merge pull request #911 from karalabe/whisper-graceful-fail

rpc, xeth: fix #881, gracefully handle offline whisper
This commit is contained in:
Jeffrey Wilcke 2015-05-11 02:29:27 -07:00
commit 6674ea8d67
4 changed files with 53 additions and 2 deletions

@ -439,10 +439,18 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = newHexData(res) *reply = newHexData(res)
case "shh_version": case "shh_version":
// Short circuit if whisper is not running
if api.xeth().Whisper() == nil {
return NewNotAvailableError(req.Method, "whisper offline")
}
// Retrieves the currently running whisper protocol version // Retrieves the currently running whisper protocol version
*reply = api.xeth().WhisperVersion() *reply = api.xeth().WhisperVersion()
case "shh_post": case "shh_post":
// Short circuit if whisper is not running
if api.xeth().Whisper() == nil {
return NewNotAvailableError(req.Method, "whisper offline")
}
// Injects a new message into the whisper network // Injects a new message into the whisper network
args := new(WhisperMessageArgs) args := new(WhisperMessageArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
@ -455,10 +463,18 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = true *reply = true
case "shh_newIdentity": case "shh_newIdentity":
// Short circuit if whisper is not running
if api.xeth().Whisper() == nil {
return NewNotAvailableError(req.Method, "whisper offline")
}
// Creates a new whisper identity to use for sending/receiving messages // Creates a new whisper identity to use for sending/receiving messages
*reply = api.xeth().Whisper().NewIdentity() *reply = api.xeth().Whisper().NewIdentity()
case "shh_hasIdentity": case "shh_hasIdentity":
// Short circuit if whisper is not running
if api.xeth().Whisper() == nil {
return NewNotAvailableError(req.Method, "whisper offline")
}
// Checks if an identity if owned or not // Checks if an identity if owned or not
args := new(WhisperIdentityArgs) args := new(WhisperIdentityArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
@ -467,6 +483,10 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = api.xeth().Whisper().HasIdentity(args.Identity) *reply = api.xeth().Whisper().HasIdentity(args.Identity)
case "shh_newFilter": case "shh_newFilter":
// Short circuit if whisper is not running
if api.xeth().Whisper() == nil {
return NewNotAvailableError(req.Method, "whisper offline")
}
// Create a new filter to watch and match messages with // Create a new filter to watch and match messages with
args := new(WhisperFilterArgs) args := new(WhisperFilterArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
@ -476,6 +496,10 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = newHexNum(big.NewInt(int64(id)).Bytes()) *reply = newHexNum(big.NewInt(int64(id)).Bytes())
case "shh_uninstallFilter": case "shh_uninstallFilter":
// Short circuit if whisper is not running
if api.xeth().Whisper() == nil {
return NewNotAvailableError(req.Method, "whisper offline")
}
// Remove an existing filter watching messages // Remove an existing filter watching messages
args := new(FilterIdArgs) args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
@ -484,6 +508,10 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = api.xeth().UninstallWhisperFilter(args.Id) *reply = api.xeth().UninstallWhisperFilter(args.Id)
case "shh_getFilterChanges": case "shh_getFilterChanges":
// Short circuit if whisper is not running
if api.xeth().Whisper() == nil {
return NewNotAvailableError(req.Method, "whisper offline")
}
// Retrieve all the new messages arrived since the last request // Retrieve all the new messages arrived since the last request
args := new(FilterIdArgs) args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
@ -492,12 +520,17 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = api.xeth().WhisperMessagesChanged(args.Id) *reply = api.xeth().WhisperMessagesChanged(args.Id)
case "shh_getMessages": case "shh_getMessages":
// Short circuit if whisper is not running
if api.xeth().Whisper() == nil {
return NewNotAvailableError(req.Method, "whisper offline")
}
// Retrieve all the cached messages matching a specific, existing filter // Retrieve all the cached messages matching a specific, existing filter
args := new(FilterIdArgs) args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
return err return err
} }
*reply = api.xeth().WhisperMessages(args.Id) *reply = api.xeth().WhisperMessages(args.Id)
case "eth_hashrate": case "eth_hashrate":
*reply = newHexNum(api.xeth().HashRate()) *reply = newHexNum(api.xeth().HashRate())

@ -116,7 +116,7 @@ func RpcResponse(api *EthereumApi, request *RpcRequest) *interface{} {
switch reserr.(type) { switch reserr.(type) {
case nil: case nil:
response = &RpcSuccessResponse{Jsonrpc: jsonrpcver, Id: request.Id, Result: reply} response = &RpcSuccessResponse{Jsonrpc: jsonrpcver, Id: request.Id, Result: reply}
case *NotImplementedError: case *NotImplementedError, *NotAvailableError:
jsonerr := &RpcErrorObject{-32601, reserr.Error()} jsonerr := &RpcErrorObject{-32601, reserr.Error()}
response = &RpcErrorResponse{Jsonrpc: jsonrpcver, Id: request.Id, Error: jsonerr} response = &RpcErrorResponse{Jsonrpc: jsonrpcver, Id: request.Id, Error: jsonerr}
case *DecodeParamError, *InsufficientParamsError, *ValidationError, *InvalidTypeError: case *DecodeParamError, *InsufficientParamsError, *ValidationError, *InvalidTypeError:

@ -209,6 +209,22 @@ func NewNotImplementedError(method string) *NotImplementedError {
} }
} }
type NotAvailableError struct {
Method string
Reason string
}
func (e *NotAvailableError) Error() string {
return fmt.Sprintf("%s method not available: %s", e.Method, e.Reason)
}
func NewNotAvailableError(method string, reason string) *NotAvailableError {
return &NotAvailableError{
Method: method,
Reason: reason,
}
}
type DecodeParamError struct { type DecodeParamError struct {
err string err string
} }

@ -79,7 +79,6 @@ func New(eth *eth.Ethereum, frontend Frontend) *XEth {
xeth := &XEth{ xeth := &XEth{
backend: eth, backend: eth,
frontend: frontend, frontend: frontend,
whisper: NewWhisper(eth.Whisper()),
quit: make(chan struct{}), quit: make(chan struct{}),
filterManager: filter.NewFilterManager(eth.EventMux()), filterManager: filter.NewFilterManager(eth.EventMux()),
logQueue: make(map[int]*logQueue), logQueue: make(map[int]*logQueue),
@ -88,6 +87,9 @@ func New(eth *eth.Ethereum, frontend Frontend) *XEth {
messages: make(map[int]*whisperFilter), messages: make(map[int]*whisperFilter),
agent: miner.NewRemoteAgent(), agent: miner.NewRemoteAgent(),
} }
if eth.Whisper() != nil {
xeth.whisper = NewWhisper(eth.Whisper())
}
eth.Miner().Register(xeth.agent) eth.Miner().Register(xeth.agent)
if frontend == nil { if frontend == nil {
xeth.frontend = dummyFrontend{} xeth.frontend = dummyFrontend{}